Commit d62f01e4 authored by Takhir Fakhrutdinov's avatar Takhir Fakhrutdinov

Создание задач по универсальной схеме

parent 5aba3e33
#!/opt/local/bin/python
# -*- coding: utf-8 -*-
import sys
import jsonschema as js
import json
fsn = sys.argv[1]
with open(fsn,'r') as fs:
schema = json.load(fs)
print ('OK =================')
js.validate(json.load(sys.stdin),schema)
......@@ -48,6 +48,8 @@ class baseEphemFile(baseFile):
:linenos:
:caption: load_data
"""
from .task import create_task_sql
logger.info('Загрузка файла...')
await super().load(fd,args)
self.fid = await self.getfid()
......@@ -73,7 +75,7 @@ class baseEphemFile(baseFile):
await self.create_doc()
await self.copy_to_table('t_ephemeris',source=fo,schema_name='main',delimiter='\t',columns=('fid','nko','bdt','sec','params','tp',))
if self.task_enabled:
task = await self.fetchval("select * from dbq.put_task(_tp=>$1,_payload=>json_build_object('fid',$2::bigint))",self.tasktyp,self.fid)
task = await self.fetchval(create_task_sql(),self.tasktyp,self.fid)
if task: logger.info(f"Создана задача {task} конвертации орбит, файл {self.args.fname} ({self.fid}).")
else: logger.warning(f"Не создана задача конвертации орбит, файл {self.args.fname} ({self.fid}).")
else: logger.info(f"Конвертация орбит запрещена, файл {self.args.fname} ({self.fid}).")
......
# -*- coding: utf-8 -*-
from .xFile import baseFile
import io
import logging
logger = logging.getLogger('')
class eptFile(baseFile):
def __init__(self,*args,**kwargs):
""" Конструктор.
.. literalinclude:: ../../../../fte/lib/libpy/msFile.py
:language: python
:lines: 297,306-311
:linenos:
:caption: __init__
"""
from .parsers import eopfile
super().__init__(*args,**kwargs)
self.parser = eopfile.parsefile
self.dbtype = 'ept'
async def load(self,fd,args):
""" Метод. Загрузка файла измерений в БД.
.. literalinclude:: ../../../../fte/lib/libpy/msFile.py
:language: python
:lines: 194,203-293
:linenos:
:caption: load_data
"""
from datetime import datetime,date,time
_2json = lambda obj: (
obj.isoformat()
if isinstance(obj, (datetime,date,time,))
else None
)
logger.info('Загрузка файла...')
await super().load(fd,args)
vc,sensor,tracks,self.tbeg,self.tend,file_params = self.parse_file(fd)
self.fileattr.update(file_params)
dev = await self.fetchval("select * from main.devget_id($1::integer,$2)",sensor,self.tbeg.date())
night = await self.fetchval("select * from main.devget_night($1,$2)",dev,self.tbeg)
self.devs = [dev]
self.fid = await self.getfid()
# Формируем файлы csv
fo = io.BytesIO()
for t in tracks:
trackid = t.params.pop('obj')
head = json.dumps(t.params,default=_2json,separators=(',', ':',))
async with self.transaction():
await self.execute("""
insert into xd.t_localzone(dev,night,fn)
select $1,$2,fn
from unnest($3::text[]) fn
on conflict do nothing
""",dev,night,t.files)
# получаем идентификаторы файлов
foid = await self.fetchval("""
with sf as (
select $1::integer dev,$2::date::timestamp night,fn,rn
from unnest($3::text[]) with ordinality v(fn,rn)
)
select array_agg(foid order by rn)
from xd.t_localzone join sf using(dev,night,fn)
""",dev,night,t.files)
_ = list()
for i in range(len(foid)):
_.append(r'"({},\\"{}\\")"'.format(foid[i],json.dumps(t.eop[i],default=_2json,separators=(',', ':',)).replace('"',r'\\"\\"')))
eop = '{%s}' % ','.join(_)
self.write(fo,(self.fid,trackid,dev,night,t.meas,head,eop,vc,))
fo.seek(0)
async with self.transaction():
await self.create_doc()
await self.copy_to_table('t_trackseop',source=fo,schema_name='xd',delimiter='\t')
This diff is collapsed.
......@@ -34,6 +34,8 @@ class lcvFile(measFile):
:linenos:
:caption: load_data
"""
from .task import create_task_seance_sql
logger.info('Загрузка файла...')
await super().load(fd,args)
......@@ -84,6 +86,6 @@ class lcvFile(measFile):
await self.copy_to_table('t_tracks',source=fo,schema_name='xd',delimiter='\t')
await self.copy_to_table('t_filetrack',source=fx,schema_name='xd',delimiter='\t',columns=['track','tm','fid','loaded'])
await self.copy_to_table('t_tracknci',source=fn,schema_name='xd',delimiter='\t')
task = await self.fetchval("select * from dbq.put_task_seance($1,$2)",self.fid,self.tasktyp)
task = await self.fetchval(create_task_seance_sql(),self.fid,self.tasktyp)
if task: logger.info(f"Создана задача {task} идентификации измерений {self.args.fname} ({self.fid}).")
else: logger.warning(f"Не создана задача идентификации измерений {self.args.fname} ({self.fid}).")
......@@ -31,7 +31,7 @@ class lczFile(baseFile):
real = lambda x: round(float(x),6)
nighttime = lambda x: datetime.strptime(x,'%Y%m%d')
timestamp = lambda x: datetime.strptime(x,'%d/%m/%Y %H:%M:%S.%f')
rec_list = { "siteid":int,"nightid":nighttime,"filename":str,"obstime":timestamp,"crops":str,}
rec_list = { "siteid":int,"nightid":nighttime,"filename":str,"obstime":timestamp,}
frame_list = {
"ra_j2000":real,"dec_j2000":real,"orientation":real,
"ast_ra_j2000":real,"ast_dec_j2000":real,"ast_rms_ra":real,"ast_rms_dec":real,"ast_orientation":real,
......@@ -44,7 +44,7 @@ class lczFile(baseFile):
frame = {k:f(r[k]) for k,f in frame_list.items() if k in r and r.get(k) is not None}
_ = {k:f(r[k]) for k,f in rec_list.items() if k in r}
rec = namedtuple('rec_t',_.keys())(**_)
crops = base64.b64decode(rec.crops)
crops = base64.b64decode(r['crops']) if 'crops' in r else None
if rec.nightid not in nights:
await self.execute("select * from template.partition_table('xd.t_localzone'::regclass,$1)",rec.nightid)
......
......@@ -217,6 +217,7 @@ class eop2mFile(meteoBaseFile):
:linenos:
:caption: load_data
"""
import base64
_cnv = lambda x: str(x) if x!=None else 'null'
logger.info('Загрузка файла...')
......@@ -224,85 +225,50 @@ class eop2mFile(meteoBaseFile):
data = json.load(fd)
fo,devs = io.BytesIO(), set()
for m in data:
r = m.get('meteo')
if r:
model,serial_number = r['metadata']['station'],r['metadata']['serial_number']
dev = await self.get_device(model,serial_number)
await self.check_priv_device(dev)
if dev not in devs:
devs.add(dev)
atypes = await self.get_atypes(dev)
tm = datetime.fromisoformat(r['data']['date']).replace(microsecond=0)
self.tbeg, self.tend = min(self.tbeg,tm), max(self.tend,tm)
d = r['data']
row = '{{{}}}'.format(','.join(map(_cnv,[d.get(k) for k in atypes.keys()])))
self.write(fo,(tm,dev,row))
self.fid = await self.getfid()
self.fileattr = dict(devs=list(devs))
fo.seek(0)
async with self.transaction():
await self.create_doc()
await self.copy_to_table('t_devmetrix',source=fo,schema_name='xd',delimiter='\t')
class allskyFile(meteoBaseFile):
""" Загрузка данных ЭОП 2М
"""
def __init__(self,*args,**kwargs):
""" Конструктор.
.. literalinclude:: ../../../../fte/lib/libpy/rrdFile.py
:language: python
:lines: 229,238-240
:linenos:
:caption: __init__
"""
super().__init__(*args,**kwargs)
self.dbtype = 'asky'
async def load(self,fd,args):
""" Метод. Загрузка файла.
.. literalinclude:: ../../../../fte/lib/libpy/weaFile.py
:language: python
:lines: 27,36-
:linenos:
:caption: load_data
"""
import base64
logger.info('Загрузка файла...')
await super().load(fd,args)
data = json.load(fd)
devs = set()
self.fid = await self.getfid()
async with self.transaction():
for m in data:
r = m.get('meteo')
if r:
crops = base64.b64decode(r['allsky'])
model,serial_number = r['metadata']['station'],r['metadata']['serial_number']
tm = datetime.fromisoformat(m['header']['creation_date']).replace(microsecond=0)
self.tbeg, self.tend = min(self.tbeg,tm), max(self.tend,tm)
r = m['meteo']
allsky = r['data'].get('allsky')
if allsky:
originator = m['header']['originator']
if originator == "EOP-2M-1":
model,serial_number = 'Web-camera TPLINK','12345678'
else: raise RuntimeError(f'Неизвестная камера для {originator}')
dev = await self.get_device(model,serial_number)
await self.check_priv_device(dev)
devs.add(dev)
if dev not in devs:
await self.check_priv_device(dev)
devs.add(dev)
tm = datetime.fromisoformat(m['header']['creation_date']).replace(microsecond=0)
self.tbeg, self.tend = min(self.tbeg,tm), max(self.tend,tm)
crops = base64.b64decode(allsky)
night = await self.fetchval("select * from main.devget_night($1,$2)",dev,tm)
await self.execute("select * from template.partition_table('xd.t_allsky'::regclass,$1)",tm)
await self.execute("insert into xd.t_allsky values($1,$2,$3,$4,$5)",self.fid,dev,tm,night,crops)
else:
model,serial_number = r['metadata']['station'],r['metadata']['serial_number']
dev = await self.get_device(model,serial_number)
if dev not in devs:
await self.check_priv_device(dev)
devs.add(dev)
atypes = await self.get_atypes(dev)
tm = datetime.fromisoformat(r['data']['date']).replace(microsecond=0)
self.tbeg, self.tend = min(self.tbeg,tm), max(self.tend,tm)
d = r['data']
row = '{{{}}}'.format(','.join(map(_cnv,[d.get(k) for k in atypes.keys()])))
self.write(fo,(tm,dev,row))
self.fileattr = dict(devs=list(devs))
fo.seek(0)
await self.create_doc()
await self.copy_to_table('t_devmetrix',source=fo,schema_name='xd',delimiter='\t')
class allsky_hcFile(meteoBaseFile):
""" Загрузка данных ЭОП 2М
class eop1mFile(meteoBaseFile):
""" Загрузка данных ЭОП
"""
def __init__(self,*args,**kwargs):
......@@ -315,7 +281,8 @@ class allsky_hcFile(meteoBaseFile):
:caption: __init__
"""
super().__init__(*args,**kwargs)
self.dbtype = 'asky'
self.dbtype = 'me1'
async def load(self,fd,args):
""" Метод. Загрузка файла.
......@@ -326,26 +293,33 @@ class allsky_hcFile(meteoBaseFile):
:linenos:
:caption: load_data
"""
import base64
from numpy import float32
_cnv = lambda x: str(float32(x)) if x!=None else 'null'
logger.info('Загрузка файла...')
await super().load(fd,args)
data = json.load(fd)
fo,devs = io.BytesIO(), set()
self.fid = await self.getfid()
for m in data:
r = m['meteo']
model = 'DavisVantage' if 'TOutside' in r['data'] else 'CloudWatcher'
serial_number = r['hostid']
dev = await self.get_device(model,serial_number)
if dev not in devs:
await self.check_priv_device(dev)
devs.add(dev)
atypes = await self.get_atypes(dev)
tm = datetime.strptime(r['data']['Date'],'%d/%m/%Y %H:%M:%S')
self.tbeg, self.tend = min(self.tbeg,tm), max(self.tend,tm)
d = r['data']
row = '{{{}}}'.format(','.join(map(_cnv,[d.get(k) for k in atypes.keys()])))
self.write(fo,(tm,dev,row))
dev = await self.get_device('Web-camera TPLINK','12345678')
await self.check_priv_device(dev)
devs.add(dev)
self.fileattr = dict(devs=list(devs))
fo.seek(0)
self.fid = await self.getfid()
async with self.transaction():
for m in data:
crops = base64.b64decode(m['allsky'])
tm = datetime.fromisoformat(m['header']['creation_date']).replace(microsecond=0)
self.tbeg, self.tend = min(self.tbeg,tm), max(self.tend,tm)
night = await self.fetchval("select * from main.devget_night($1,$2)",dev,tm)
await self.execute("select * from template.partition_table('xd.t_allsky'::regclass,$1)",tm)
await self.execute("insert into xd.t_allsky values($1,$2,$3,$4,$5)",self.fid,dev,tm,night,crops)
self.fileattr = dict(devs=list(devs))
await self.create_doc()
await self.copy_to_table('t_devmetrix',source=fo,schema_name='xd',delimiter='\t')
......@@ -21,10 +21,11 @@ class msFile(measFile):
:caption: load_data
"""
from datetime import datetime,date,time
from .task import create_task_seance_sql
_2json = lambda obj: (
obj.isoformat()
if isinstance(obj, (datetime,date,time,))
else str(obj) if isinstance(obj, Decimal)
else None
)
......@@ -98,7 +99,7 @@ class msFile(measFile):
if not (skip_res and self.dbtype == 'om'):
await self.copy_to_table('t_tracks',source=fo,schema_name='xd',delimiter='\t')
await self.copy_to_table('t_filetrack',source=fx,schema_name='xd',delimiter='\t',columns=['track','tm','fid','loaded'])
task = await self.fetchval("select * from dbq.put_task_seance($1,$2)",self.fid,self.tasktyp)
task = await self.fetchval(create_task_seance_sql(),self.fid,self.tasktyp)
if task: logger.info(f"Создана задача {task} идентификации измерений {self.args.fname} ({self.fid}).")
else: logger.warning(f"Не создана задача идентификации измерений {self.args.fname} ({self.fid}).")
......
# -*- coding: utf-8 -*-
import logging
import json
import jsonschema as js
from datetime import datetime
import hashlib as hs
import base64
from collections import namedtuple
logger = logging.getLogger()
real = lambda x: round(float(x),6)
track_list = {'sigma':real,'distrust':bool,'fake':bool}
track_list = {'sigma':real,'distrust':bool,'fake':int}
bind_list = {'time':real,'id':int,'along':real,'across':real,'dmag':real}
meas_list = {'ra_j2000':float,'dec_j2000':float,'mag':float,'ra_j2000_full_error':float,'dec_j2000_full_error':float,'mag_error':float,}
eop_list = {'ra_j2000_error':real,'dec_j2000_error':real,'x':real,'x_error':real,'y':real,'y_error':real,
'fwhm_x':real,'fwhm_x_error':real,'fwhm_y':real,'fwhm_y_error':real,
'rot':real,'snr':real,'peak_snr':real,'flux':real,'ins_mag':real,'aper_width':real,'aper_height':real,
'rot':real,'snr':real,'peak_snr':real,'flux':real,'inst_mag':real,'aper_width':real,'aper_height':real,
'channel':int,'filter':str,
}
full_schema = {
"type": "object",
"required": ["nightid","siteid","tracks"],
"properties": {
"nightid": {"type": "integer"},
"siteid": {"type": "integer"},
"tracks": {
"type": "array",
"items": {
"allOf":[
{
"type": "object",
"required": ["distrust","sigma","trackid","mag"],
"properties": {
"bind": {"$ref": "#/$defs/bind"},
"distrust": {"type": "integer"},
"trackid": {"type": "integer"},
"sigma": {"$ref": "#/$defs/number"},
"mag": {"$ref": "#/$defs/number"},
"fake":{"type": "integer"},
"meas": {
"type": "array",
"items": {
"allOf":[
{
"type": "object",
"required": [
"aper_height", "aper_width", "channel", "crops", "dec_j2000", "dec_j2000_error",
"dec_j2000_full_error", "filename", "flux", "fwhm_x", "fwhm_x_error", "fwhm_y",
"fwhm_y_error", "inst_mag", "mag", "mag_error",
"peak_snr", "ra_j2000", "ra_j2000_error", "ra_j2000_full_error",
"rot", "snr", "utc", "x", "x_error", "y", "y_error"
],
"properties": {
"aper_height": {"$ref": "#/$defs/number"},
"aper_width": {"$ref": "#/$defs/number"},
"channel": {"type": "integer"},
"crops": {"$ref": "#/$defs/base64"},
"dec_j2000": {"$ref": "#/$defs/number"},
"dec_j2000_error": {"$ref": "#/$defs/number"},
"dec_j2000_full_error": {"$ref": "#/$defs/number"},
"filename": {"type": "string"},
"flux":{"$ref": "#/$defs/number"},
"fwhm_x": {"$ref": "#/$defs/number"},
"fwhm_x_error": {"$ref": "#/$defs/number"},
"fwhm_y": {"$ref": "#/$defs/number"},
"fwhm_y_error": {"$ref": "#/$defs/number"},
"inst_mag": {"$ref": "#/$defs/number"},
"mag": {"$ref": "#/$defs/number"},
"mag_error": {"$ref": "#/$defs/number"},
"peak_snr": {"$ref": "#/$defs/number"},
"ra_j2000": {"$ref": "#/$defs/number"},
"ra_j2000_error": {"$ref": "#/$defs/number"},
"ra_j2000_full_error": {"$ref": "#/$defs/number"},
"rot": {"$ref": "#/$defs/number"},
"snr": {"$ref": "#/$defs/number"},
"utc": {"$ref": "#/$defs/datetime"},
"x": {"$ref": "#/$defs/number"},
"x_error": {"$ref": "#/$defs/number"},
"y": {"$ref": "#/$defs/number"},
"y_error": {"$ref": "#/$defs/number"},
"filter":{"type": "string"}
}
}
]
}
}
}
}
]
}
}
},
"$defs": {
"timestamp": {"type" : "string","pattern" : "^(\\d{4})-(\\d{2})-(\\d{2})([T ](\\d{2}):(\\d{2})(:(\\d{2}(?:\\.\\d+)?))?)?$"},
"datetime": {"type":"string","pattern":"^\\d{2}\\/\\d{2}\\/\\d{4}\\s\\d{2}\\:\\d{2}\\:\\d{2}(\\.\\d+)?$"},
"base64": {"type":"string","pattern":"^(?=(.{4})*$)[A-Za-z0-9+\\/]*={0,2}$"},
"integer": {"type":"string","pattern":"^[-+]?\\d+$"},
"number": {"anyOf":[{"type":"string","pattern":"^[-+]?[0-9]*\\.?[0-9]+([eE][-+]?[0-9]+)?$"},{"type":"number"}]},
"bind": {"anyOf":[
{
"type": "object",
"required": ["across", "along", "id", "time"],
"properties": {
"across": {"type": "number"},
"along": {"type": "number"},
"dmag": {"type": "number"},
"id": {"$ref": "#/$defs/integer"},
"time": {"type": "number"}
}
},
{"type":"null"},
{"type":"string","pattern":"^$"}
]}
}
}
def _hms(_angle):
""" Метод. Преобразование угла из градусной меры в часовую.
......@@ -83,7 +184,7 @@ def _get_meas_t(meas,trackid):
:linenos:
:caption: _get_meas_t
"""
ms,mags,files,eop,tb,te,_times = list(),list(),list(),list(),datetime.max, datetime.min,set()
ms,files,eop,tb,te,_times = list(),list(),list(),datetime.max,datetime.min,set()
for m in meas:
if m['utc'] in _times:
......@@ -95,10 +196,6 @@ def _get_meas_t(meas,trackid):
files.append(m['filename'])
tb,te = min(tb,tm),max(te,tm)
rec = [v(m[k]) if k != 'mag' else v(m.get(k,0)) for k,v in meas_list.items()]
mag = float(m.get('mag',0.0))
if mag != 0: mags.append(mag)
eop.append({k:f(m[k]) for k,f in eop_list.items() if k in m and m.get(k) is not None})
if 'crops' in m:
crops = r'\\\\\\\\x%s' % base64.b64decode(m['crops']).hex()
......@@ -106,24 +203,26 @@ def _get_meas_t(meas,trackid):
else:
ms.append(r'"(\\"%s\\",\\"%s\\",)"' % (tm.isoformat(),str(rec).replace('[','{').replace(']','}'),))
avg_m = round(sum(mags)/len(mags),1) if len(mags) else 0.0
return tb,te,avg_m,'{%s}' % ','.join(ms),files,eop
return tb,te,'{%s}' % ','.join(ms),files,eop
def parsefile(f):
tracks,tbeg,tend,mcnt = list(),datetime.max,datetime.min,0
data = json.load(f)
siteid = None
for t in data['tracks']:
if not siteid: siteid = t['siteid']
js.validate(data,full_schema)
siteid = data['siteid']
#night = datetime.strptime(str(data['nightid']),'%Y%m%d').date()
for t in data['tracks']:
b = t.get('bind')
tb,te,avg_m,ms,files,eop = _get_meas_t(t['meas'],t['trackid'])
tb,te,ms,files,eop = _get_meas_t(t['meas'],t['trackid'])
tbeg,tend,cnt = min(tbeg,tb),max(tend,te),len(t['meas'])
mcnt += cnt
# Формируем проводку
params = {k:f(t[k]) for k,f in track_list.items() if k in t and t.get(k) is not None}
if b: params.update(dict(bind={k:f(b[k]) for k,f in bind_list.items() if k in b and b.get(k) is not None}))
params.update(dict(cnt=cnt,mag=avg_m,obj=t['trackid'],area=t.get('target')))
params.update(dict(obj=t['trackid'],cnt=cnt,mag=real(t.get('mag',0)),area=t.get('target')))
if t.get('joinedtrackid'):
params['jtrack'] = t.get('joinedtrackid')
tr = dict(
tm=te,
meas=ms,
......@@ -141,3 +240,4 @@ def parsefile(f):
......@@ -31,7 +31,6 @@ import re
from datetime import datetime,time
import hashlib as hs
import logging
from decimal import Decimal
logger = logging.getLogger('')
......
# -*- coding: utf-8 -*-
import logging
import json
import jsonschema as js
from datetime import datetime
from collections import namedtuple
logger = logging.getLogger()
full_schema = {
"type": "array",
"items": {
"allOf": [
{
"type": "object",
"required": ["header","verify"],
"properties": {
"header": {
"type": "object",
"additionalProperties": False,
"required": ["originator","message_type","message_id"],
"properties": {
"creation_date":{"$ref": "#/$defs/timestamp"},
"originator": {"type": "string"},
"message_type": {"type": "string","enum":["verify"]},
"message_id": {"type": "string"},
"time_system": {"type": "string","enum":["UTC"]},
"comment": {"type": "string"},
"shift_boss":{"type": "string"}
}
},
"verify":{
"type":"object",
"additionalProperties": False,
"anyOf":[
{"required":["night","site","fake"],"not":{"required":["joined"]}},
{"required":["night","site","trust"]}
],
"properties": {
"night" : {"$ref": "#/$defs/date"},
"site" : {"$ref": "#/$defs/integer"},
"trust":{
"type":"array",
"items": {"allOf":[{"type":"integer"}]}
},
"fake":{
"type":"array",
"items": {"allOf":[{"type":"integer"}]}
},
"joined":{
"type":"array",
"items":{
"allOf":[
{
"type":"object",
"additionalProperties": False,
"required":["trackid","tracks"],
"properties":{
"trackid":{"type":"integer"},
"tracks":{
"type":"array",
"items": {"allOf":[{"type":"integer"}]}
}
}
}
]
}
}
}
}
}
}
]
},
"$defs": {
"timestamp": {"type" : "string","pattern" : "^(\\d{4})-(\\d{2})-(\\d{2})([T ](\\d{2}):(\\d{2})(:(\\d{2}(?:\\.\\d+)?))?)?$"},
"date": {"type":"string","pattern":"^(\\d{4})-(\\d{2})-(\\d{2})$"},
"integer": {"anyOf":[{"type":"string","pattern":"^[-+]?\\d+$"},{"type":"integer"}]}
}
}
Record = namedtuple('Record',['telno','night','trackid','joined','fake'])
def parsefile(f):
data = json.load(f)
js.validate(data,full_schema)
header = data[0]['header']
records = list()
for ms in data:
r = ms['verify']
night = datetime.fromisoformat(r['night']).date()
telno = int(r['site'])
recs = dict()
if 'joined' in r:
for l in r['joined']:
trackid = l['trackid']
recs[trackid] = [r'\N',1]
for t in l['tracks']:
recs[t] = [str(trackid),1]
if 'trust' in r:
for t in r['trust']:
if t in recs: recs[t][1] = 0
else: recs[t]=[r'\N',0]
if 'fake' in r:
for t in r['fake']:
if t in recs: recs[t][1] = 1
else: recs[t]=[r'\N',1]
for k,v in recs.items():
records.append(Record._make((telno,night,k,v[0],v[1])))
return records,header
# -*- coding: utf-8 -*-
_QUEUE_='dbq' # 'ctl'
_zotranslate = dict(dbq="select * from dbq.put_tasks_zotranslate($1)",ctl="select ctl.create_tasks_zo_translate($1)")
def create_task_zotraslate_sql():
return _zotranslate[_QUEUE_]
_tletranslate = dict(
dbq="select * from dbq.put_tasks_tletranslate($1)",
ctl="insert into ctl.t_pendingtask(tp,icp) values(1,json_build_object('orbits',array_to_string($1,','),'type','tle')) returning 1"
)
def create_task_tletraslate_sql():
return _tletranslate[_QUEUE_]
_seance = dict(dbq="select * from dbq.put_task_seance($1,$2)",ctl="select * from ctl.create_tasks_seance($1)")
def create_task_seance_sql():
return _seance[_QUEUE_]
_mail_4302 = dict(
dbq="""
select dbq.put_tasks_mail(
_pub => 4302
,_attach=>json_build_array(
json_build_object(
'fname',format('TC2LE_%s_%s.tc',to_char('now'::timestamp,'YYMMDD_HH24MISS'),77),
'data',array_agg(trim(l) order by id,substr(l,1,1))
)
)
)
from (
select id,unnest(array[tline1,tline2])l
from tle.t_tle
where id = any($1)
) r
""",
ctl="""
select ctl.create_tasks_mail(
4302
,_attach=>json_build_array(
json_build_object(
'fname',format('TC2LE_%s_%s.tc',to_char('now'::timestamp,'YYMMDD_HH24MISS'),77),
'data',array_agg(trim(l) order by id,substr(l,1,1))
)
)
)
from (
select id,unnest(array[tline1,tline2])l
from tle.t_tle
where id = any(%1)
) r
"""
)
def create_task_mail_TC2LE_sql():
return _mail_4302[_QUEUE_]
_mail_4400 = dict(
dbq="select * from dbq.put_tasks_mail(_pub=>4400,_p1=>$1,_event=>$2)",
ctl="select * from ctl.create_tasks_mail(_pub=>4400,_p1=>$1,_event=>$2)"
)
def create_task_mail_extdata_sql():
return _mail_4400[_QUEUE_]
_mail_1010 = dict(
dbq="select * from dbq.put_tasks_mail(_pub=>1010,_p1=>$1,_event=>$2)",
ctl="select * from ctl.create_tasks_mail(_pub=>1010,_p1=>$1,_event=>$2)"
)
def create_task_mail_meas_file_sql():
return _mail_1010[_QUEUE_]
_mail_1011 = dict(
dbq="select * from dbq.put_tasks_mail(_pub=>1011,_p1=>$1,_event=>$2,_message=>$3::json)",
ctl="select * from ctl.create_tasks_mail(_pub=>1011,_p1=>$1,_event=>$2,_message=>$3::json)"
)
def create_task_mail_meas_original_sql():
return _mail_1011[_QUEUE_]
_mail_1020 = dict(
dbq="select * from dbq.put_tasks_mail(_pub=>1020,_p1=>$1,_event=>$2)",
ctl="select * from ctl.create_tasks_mail(_pub=>1020,_p1=>$1,_event=>$2)"
)
def create_task_mail_zo_report_sql():
return _mail_1020[_QUEUE_]
_task = dict(
dbq="select * from dbq.put_task(_tp=>$1,_payload=>json_build_object('fid',$2::bigint))",
ctl="select ctl.create_task($1,$2,0,$2::text)"
)
def create_task_sql():
return _task[_QUEUE_]
......@@ -67,6 +67,8 @@ class tleFile(baseFile):
:linenos:
:caption: load_data
"""
from .task import create_task_tletraslate_sql, create_task_mail_TC2LE_sql
part = lambda x : datetime(x.year,x.month,1)
logger.info('Загрузка файла...')
self.tai_utc = await self.fetch("select dt::timestamp,tai_utc*'1s'::interval from main.t_leapsec where dt>='1972-01-01' order by dt desc")
......@@ -92,26 +94,11 @@ class tleFile(baseFile):
schema,table = tbl.split('.')
await self.copy_to_table(table,source=v,schema_name='tle',delimiter='\t')
cnt = await self.fetchval("select * from dbq.put_tasks_tletranslate($1)",ids);
cnt = await self.fetchval(create_task_tletraslate_sql(),ids);
logger.info(f"Создано {cnt} задач(а) конвертации орбит, файл {self.args.fname} ({self.fid}).")
# создаем задачу отправки сообщений TC2LE
cnt = await self.fetchval("""
select dbq.put_tasks_mail(
_pub => 4302
,_attach=>json_build_array(
json_build_object(
'fname',format('TC2LE_%s_%s.tc',to_char('now'::timestamp,'YYMMDD_HH24MISS'),77),
'data',array_agg(trim(l) order by id,substr(l,1,1))
)
)
)
from (
select id,unnest(array[tline1,tline2])l
from tle.t_tle
where id = any($1)
) r
""",ids)
cnt = await self.fetchval(create_task_mail_TC2LE_sql(),ids)
logger.info(f"Создано {cnt} задач отправки сообщений TC2LE ОЭК ОКМ...")
......
# -*- coding: utf-8 -*-
from .xFile import baseFile
import io
import logging
logger = logging.getLogger('')
class eptFile(baseFile):
def __init__(self,*args,**kwargs):
""" Конструктор.
.. literalinclude:: ../../../../fte/lib/libpy/msFile.py
:language: python
:lines: 297,306-311
:linenos:
:caption: __init__
"""
from .parsers import verifyfile
super().__init__(*args,**kwargs)
self.parser = verifyfile.parsefile
self.dbtype = 'vfy'
self.tasktyp = 48
async def load(self,fd,args):
""" Метод. Загрузка файла измерений в БД.
.. literalinclude:: ../../../../fte/lib/libpy/msFile.py
:language: python
:lines: 194,203-293
:linenos:
:caption: load_data
"""
from .task import create_task_seance_sql
logger.info('Загрузка файла...')
await super().load(fd,args)
records,file_params = self.parse_file(fd)
self.fileattr.update(file_params)
tels = set()
self.devs = list()
nights = set()
tables = dict()
self.fid = await self.getfid()
for r in records:
if r.telno not in tels:
dev = await self.fetchval("select * from main.devget_id($1::integer,$2)",r.telno,r.night)
self.check_privilege_device(dev)
self.devs.append(dev)
tels.add(r.telno)
if r.night not in nights:
tb,te = await self.fetchrow("select lower(nr),upper(nr) from main.devget_nightrange(%s,%s) nr",(dev,r.night,))
self.tbeg,self.tend = min(self.tbeg,tb),max(self.tend,te)
table = await self.fetchval("select * from template.relname('xd.t_trackverify',$1::timestamp)",(r.night,))
if table not in tables:
tables[table] = io.BytesIO()
async with self.transaction():
await self.execute("select * from template.partition_table('xd.t_trackverify'::regclass,$1::timestamp)",r.night)
nights.add(r.night)
self.write(tables[table],(self.fid,dev,r.night,r.trackid,r.joined,r.fake))
async with self.transaction():
await self.create_doc()
await.self.execute("create temp table _t1 (like xd.t_trackverify) on commit drop")
for tbl,fo in tables.item():
fo.seek(0)
await self.execute("truncate _t1")
await self.copy_to_table('t_trackverify',source=fo,schema_name='xd',delimiter='\t')
await self.execute("""
insert into {}
select *
from _t1
on conflict (dev,night,trackid) do
update set (fid,fake)=(excluded.fid,excluded.fake)
""".format(tbl))
await self.execute("select * from xd.track_verifiedproc($1)",self.fid)
await self.execute(create_task_seance_sql(),self.fid,self.tasktyp)
......@@ -89,7 +89,6 @@ class baseFile(Connection):
_2json = lambda obj: (
obj.isoformat()
if isinstance(obj, (datetime,date,time,))
else str(obj) if isinstance(obj, Decimal)
else None
)
......@@ -157,6 +156,8 @@ class baseFile(Connection):
:linenos:
:caption: create_doc
"""
from .task import create_task_mail_extdata_sql
self.doctm = datetime.utcnow()
self.log.seek(0)
self.fileattr.update(dict(log=self.log.read(),subj=self.args.subject,rem=self.args.comment))
......@@ -175,7 +176,7 @@ class baseFile(Connection):
)
logger.info(f"Создан документ номер:{self.fid} ...")
if self.dbtype in baseFile.pubfiles:
task = await self.fetchval("select * from dbq.put_tasks_mail(_pub=>4400,_p1=>$1,_event=>$2)",baseFile.pubfiles[self.dbtype],str(self.fid))
task = await self.fetchval(create_task_mail_extdata_sql(),baseFile.pubfiles[self.dbtype],str(self.fid))
if task: logger.info(f"Создана задача {task} отправки файла внешних данных {self.args.fname} номер:{self.fid} ...")
......@@ -276,6 +277,8 @@ class measFile(baseFile):
return _crc
async def create_doc(self):
from .task import create_task_mail_meas_file_sql, create_task_mail_meas_original_sql
state = await self.execute("""update main.t_files set state = 0
where ftype='zr' and tels && $1 and utr && tsrange($2,$3,'[]')
and utr != tsrange(null,null)""",self.devs,self.tbeg,self.tend)
......@@ -284,8 +287,8 @@ class measFile(baseFile):
self.state |= 512
await super().create_doc()
body = json.dumps(self.args.comment.split('\n')) #???
task = await self.fetchval("select * from dbq.put_tasks_mail(_pub=>1011,_p1=>$1,_event=>$2,_message=>$3::json)",self.devs[0],str(self.fid),body)
task = await self.fetchval(create_task_mail_meas_original_sql(),self.devs[0],str(self.fid),body)
logger.info(f"Создано {task} задач(а) отправки файла измерений {self.args.fname} ({self.fid}).")
task = await self.fetchval("select * from dbq.put_tasks_mail(_pub=>1010,_p1=>$1,_event=>$2)",self.devs[0],str(self.fid))
task = await self.fetchval(create_task_mail_meas_file_sql(),self.devs[0],str(self.fid))
logger.info(f"Создано {task} задач(а) отправки оригинального файла измерений {self.args.fname} ({self.fid}).")
......@@ -59,6 +59,8 @@ class zoFile(baseFile):
:linenos:
:caption: load_data
"""
from .task import create_task_zotraslate_sql
dbt = dict(ALL='za',GEO='zg',HEO='zv',LEO='zn',CU='cu',PUBLIC='zo')
logger.info('Загрузка файла...')
self.args = args
......@@ -87,7 +89,7 @@ class zoFile(baseFile):
await self.create_doc()
await self.copy_to_table('t_exorbits',source=fo,schema_name='main',delimiter='\t')
# запуск задач конвертации орбит КО во внутренний формат
cnt = await self.fetchval("select * from dbq.put_tasks_zotranslate($1)",self.fid)
cnt = await self.fetchval(create_task_zotraslate_sql(),self.fid)
if cnt: logger.info(f"Создано {cnt} задач(а) конвертации внешних орбит {self.args.fname} ({self.fid}).")
else: raise RuntimeError("Не создано ни одной задачи конвертации внешних орбит")
# Изменение справочников КО
......@@ -171,11 +173,11 @@ class zrFile(baseFile):
import json
from datetime import datetime,date,time
from re import findall
from .task import create_task_mail_zo_report_sql
_2json = lambda obj: (
obj.isoformat()
if isinstance(obj, (datetime,date,time,))
else str(obj) if isinstance(obj, Decimal)
else None
)
......@@ -207,7 +209,7 @@ class zrFile(baseFile):
await self.create_doc()
for dev in self.devs:
await self.copy_to_table('t_exreport',source=fo,schema_name='main',delimiter='\t',columns=('fid','kiamno','nipno','params',))
task = await self.fetchval("select * from dbq.put_tasks_mail(_pub=>1020,_p1=>$1,_event=>$2)",dev,str(self.fid))
task = await self.fetchval(create_task_mail_zo_report_sql(),dev,str(self.fid))
logger.info(f"Создано {task} задач(а) отправки отчёта по сеансу измерений {self.args.fname} ({self.fid}).")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment