Commit e7a84a82 authored by Takhir Fakhrutdinov's avatar Takhir Fakhrutdinov

Обработка eop-json

parent aa539bea
This diff is collapsed.
...@@ -61,9 +61,10 @@ class lcvFile(measFile): ...@@ -61,9 +61,10 @@ class lcvFile(measFile):
_id = await self.fetchval("select * from nextval('xd.t_tracks_track_seq'::regclass)") _id = await self.fetchval("select * from nextval('xd.t_tracks_track_seq'::regclass)")
self.write(fo,(_id,tm,dev,self.vc,utr,meas,head,night,)) self.write(fo,(_id,tm,dev,self.vc,utr,meas,head,night,))
self.write(fn,(_id,tm,t['nci']['utr'],t['nci']['data'],)) self.write(fn,(_id,tm,t['nci']['utr'],t['nci']['data'],))
tracks_crc[crc] = _id tracks_crc[crc] = dict(track=_id,tm=tm)
else: else:
_id = tracks_crc[crc] _id = tracks_crc[crc]['track']
tm = tracks_crc[crc]['tm']
# проверка, есть ли НКИ? # проверка, есть ли НКИ?
nki = await self.fetchval("select true from xd.t_tracknci where track=$1 and tm=$2",_id,tm) nki = await self.fetchval("select true from xd.t_tracknci where track=$1 and tm=$2",_id,tm)
if not nki: if not nki:
......
...@@ -37,6 +37,7 @@ class msFile(measFile): ...@@ -37,6 +37,7 @@ class msFile(measFile):
await self.check_priv_device(dev) await self.check_priv_device(dev)
night = await self.fetchval("select * from main.devget_night($1,$2)",dev,self.tbeg) night = await self.fetchval("select * from main.devget_night($1,$2)",dev,self.tbeg)
skip_res = await self.fetchval("select * from main.devget_skipres($1,$2)",dev,night)
self.devs = [dev] self.devs = [dev]
self.fid = await self.getfid() self.fid = await self.getfid()
tracks_crc = await self.get_db_tracks_crc(dev) tracks_crc = await self.get_db_tracks_crc(dev)
...@@ -45,28 +46,34 @@ class msFile(measFile): ...@@ -45,28 +46,34 @@ class msFile(measFile):
fo, fx= io.BytesIO(),io.BytesIO() fo, fx= io.BytesIO(),io.BytesIO()
for t in tracks: for t in tracks:
_loaded = t.crc not in tracks_crc _loaded = t.crc not in tracks_crc
tm = t.tm
if _loaded: if _loaded:
head = json.dumps(t.params,default=_2json,separators=(',', ':',)) head = json.dumps(t.params,default=_2json,separators=(',', ':',))
_id = await self.fetchval("select * from nextval('xd.t_tracks_track_seq'::regclass)") _id = await self.fetchval("select * from nextval('xd.t_tracks_track_seq'::regclass)")
self.write(fo,(_id,t.tm,dev,vc,t.utr,t.meas,head,night,)) eop = t.eop if 'eop' in t._fileds else r'\N'
tracks_crc[t.crc] = _id self.write(fo,(_id,t.tm,dev,vc,t.utr,t.meas,head,night,eop))
tracks_crc[t.crc] = dict(track=_id,tm=t.tm)
else: else:
_id = tracks_crc[t.crc] _id = tracks_crc[t.crc]['track']
tm = tracks_crc[t.crc]['tm']
# запись в файл для xd.t_filetracks # запись в файл для xd.t_filetracks
self.write(fx,(_id,t.tm,self.fid,_loaded,)) self.write(fx,(_id,tm,self.fid,_loaded,))
self.state |= 256 self.state |= 256
if skip_res and self.dbtype == 'om': self.state |= 16
fo.seek(0); fx.seek(0) fo.seek(0); fx.seek(0)
await self.partition_tracks() await self.partition_tracks()
async with self.transaction(): async with self.transaction():
await self.create_doc() await self.create_doc()
await self.copy_to_table('t_tracks',source=fo,schema_name='xd',delimiter='\t') if not (skip_res and self.dbtype == 'om'):
await self.copy_to_table('t_filetrack',source=fx,schema_name='xd',delimiter='\t',columns=['track','tm','fid','loaded']) await self.copy_to_table('t_tracks',source=fo,schema_name='xd',delimiter='\t')
task = await self.fetchval("select * from dbq.put_task_seance($1,$2)",self.fid,self.tasktyp) await self.copy_to_table('t_filetrack',source=fx,schema_name='xd',delimiter='\t',columns=['track','tm','fid','loaded'])
if task: logger.info(f"Создана задача {task} идентификации измерений {self.args.fname} ({self.fid}).") task = await self.fetchval("select * from dbq.put_task_seance($1,$2)",self.fid,self.tasktyp)
else: logger.warning(f"Не создана задача идентификации измерений {self.args.fname} ({self.fid}).") if task: logger.info(f"Создана задача {task} идентификации измерений {self.args.fname} ({self.fid}).")
else: logger.warning(f"Не создана задача идентификации измерений {self.args.fname} ({self.fid}).")
class resFile(msFile): class resFile(msFile):
......
...@@ -8,6 +8,15 @@ from collections import namedtuple ...@@ -8,6 +8,15 @@ from collections import namedtuple
logger = logging.getLogger() logger = logging.getLogger()
track_list = {'sigma':float,'distrust':bool,'fake':bool}
bind_list = {'time':float,'id':int,'along':float,'across':float,'dmag':float}
meas_list = {'ra_j2000':float,'dec_j2000':float,'mag':float,'ra_j2000_full_error':float,'dec_j2000_full_error':float,'mag_error':float,}
eop_list = {'ra_j2000_error':float,'dec_j2000_error':float,'x':float,'x_error':float,'y':float,'y_error':float,
'fwhm_x',float,'fwhm_x_error':float,'fwhm_y':float,'fwhm_y_error':float,
'rot':float,'snr':float,'peak_snr':float,'flux':float,'ins_mag':float,'aper_width':float,'aper_height':float,
'channel':float,'filter':str,
}
def _hms(_angle): def _hms(_angle):
""" Метод. Преобразование угла из градусной меры в часовую. """ Метод. Преобразование угла из градусной меры в часовую.
...@@ -49,7 +58,7 @@ def _get_trackcrc(meas): ...@@ -49,7 +58,7 @@ def _get_trackcrc(meas):
for m in meas: for m in meas:
tm = datetime.strptime(m['utc'],'%d/%m/%Y %H:%M:%S.%f') tm = datetime.strptime(m['utc'],'%d/%m/%Y %H:%M:%S.%f')
t = '%.22s%.2f%.2f' % ( t = '%.22s%.2f%.2f' % (
tm.strftime('%Y-%m-%d %H:%M:%S.%f'), datetime.utcfromtimestamp(round(tm.timestamp(),2)).strftime('%Y-%m-%d %H:%M:%S.%f'),
_hms(float( m['ra_j2000'])), _hms(float( m['ra_j2000'])),
_hms(float( m['dec_j2000'])), _hms(float( m['dec_j2000'])),
) )
...@@ -72,15 +81,17 @@ def _get_meas_t(meas): ...@@ -72,15 +81,17 @@ def _get_meas_t(meas):
:linenos: :linenos:
:caption: _get_meas_t :caption: _get_meas_t
""" """
_fields = ('ra_j2000','dec_j2000','mag','ra_j2000_full_error','dec_j2000_full_error','mag_error',) ms,mags,files,eop,tb,te = list(),list(),list(),list(),datetime.max, datetime.min
ms,mags,tb,te = list(),list(),datetime.max, datetime.min
for m in meas: for m in meas:
tm = datetime.strptime(m['utc'],'%d/%m/%Y %H:%M:%S.%f') tm = datetime.strptime(m['utc'],'%d/%m/%Y %H:%M:%S.%f')
files.append(m['filename'])
tb,te = min(tb,tm),max(te,tm) tb,te = min(tb,tm),max(te,tm)
rec = [float(m.get(i,0.0)) for i in (_fields)] rec = [v(m[k]) for k,v in meas_list.items()]
mag = float(m.get('mag',0.0)) mag = float(m.get('mag',0.0))
if mag != 0: mags.append(mag) if mag != 0: mags.append(mag)
eop.append(json.dumps({k:f(t[k]) for k,f in eop_list.items() if k in t and t.get(k) is not None}))
if 'crops' in m: if 'crops' in m:
crops = r'\\\\\\\\x%s' % base64.b64decode(m['crops']).hex() crops = r'\\\\\\\\x%s' % base64.b64decode(m['crops']).hex()
ms.append(r'"(\\"%s\\",\\"%s\\",\\"%s\\")"' % (tm.isoformat(),str(rec).replace('[','{').replace(']','}'),crops,)) ms.append(r'"(\\"%s\\",\\"%s\\",\\"%s\\")"' % (tm.isoformat(),str(rec).replace('[','{').replace(']','}'),crops,))
...@@ -88,33 +99,34 @@ def _get_meas_t(meas): ...@@ -88,33 +99,34 @@ def _get_meas_t(meas):
ms.append(r'"(\\"%s\\",\\"%s\\",)"' % (tm.isoformat(),str(rec).replace('[','{').replace(']','}'),)) ms.append(r'"(\\"%s\\",\\"%s\\",)"' % (tm.isoformat(),str(rec).replace('[','{').replace(']','}'),))
avg_m = round(sum(mags)/len(mags),1) if len(mags) else 0.0 avg_m = round(sum(mags)/len(mags),1) if len(mags) else 0.0
return tb,te,avg_m,'{'+','.join(ms)+'}' return tb,te,avg_m,'{%s}' % ','.join(ms),files,'{%s}' % ','.join(eop)
def parsefile(f): def parsefile(f):
data = json.load(f)
tracks,tbeg,tend,mcnt = list(),datetime.max,datetime.min,0 tracks,tbeg,tend,mcnt = list(),datetime.max,datetime.min,0
data = json.load(f)
siteid = None
for t in data['tracks']: for t in data['tracks']:
tb,te,avg_m,ms = _get_meas_t(t['meas']) if not siteid: siteid = t['siteid']
tb,te,avg_m,ms,files,eop = _get_meas_t(t['meas'])
tbeg,tend,cnt = min(tbeg,tb),max(tend,te),len(t['meas']) tbeg,tend,cnt = min(tbeg,tb),max(tend,te),len(t['meas'])
mcnt += cnt mcnt += cnt
# Формируем проводку # Формируем проводку
params=dict( params = {k:f(t[k]) for k,f in track_list.items() if k in t and t.get(k) is not None}
cnt=cnt,mag=avg_m,obj=t['trackid'],bind=t.get('objectid'), params.update(dict(bind={k:f(t[k]) for k,f in bind_list.items() if k in t and t.get(k) is not None}))
area=t['target'] params.update(dict(cnt=cnt,mag=avg_m,obj=t['trackid'],area=t.get('target')))
)
tr = dict( tr = dict(
tm=te, tm=te,
meas=ms, meas=ms,
utr='["%s","%s"]' % (tb,te,), utr='["%s","%s"]' % (tb,te,),
crc=_get_trackcrc(t['meas']), crc=_get_trackcrc(t['meas']),
params=params params=params
eop=eop
files=files
) )
tracks.append(namedtuple('track_t',tr.keys())(**tr)) tracks.append(namedtuple('track_t',tr.keys())(**tr))
return 4,data['siteid'],tracks,tbeg,tend,dict(tr=len(tracks),ms=mcnt) return 4,siteid,tracks,tbeg,tend,dict(tr=len(tracks),ms=mcnt)
......
...@@ -47,7 +47,11 @@ def get_trackcrc(meas): ...@@ -47,7 +47,11 @@ def get_trackcrc(meas):
""" """
s = '' s = ''
for m in meas: for m in meas:
t = '%.22s%.2f%.2f' % (m.tm.strftime('%Y-%m-%d %H:%M:%S.%f'),m.params[0],m.params[1],) t = '%.22s%.2f%.2f' % (
datetime.utcfromtimestamp(round(m.tm.timestamp(),2)).strftime('%Y-%m-%d %H:%M:%S.%f'),
m.params[0],
m.params[1],
)
s += t s += t
return hs.md5(s.encode('utf8')).hexdigest() return hs.md5(s.encode('utf8')).hexdigest()
......
...@@ -54,7 +54,11 @@ def get_trackcrc(meas): ...@@ -54,7 +54,11 @@ def get_trackcrc(meas):
""" """
s = '' s = ''
for m in meas: for m in meas:
t = '%.22s%.2f%.2f' % (m.tm.strftime('%Y-%m-%d %H:%M:%S.%f'),m.params[0],m.params[1],) t = '%.22s%.2f%.2f' % (
datetime.utcfromtimestamp(round(m.tm.timestamp(),2)).strftime('%Y-%m-%d %H:%M:%S.%f'),
m.params[0],
m.params[1],
)
s += t s += t
return hs.md5(s.encode('utf8')).hexdigest() return hs.md5(s.encode('utf8')).hexdigest()
......
...@@ -49,7 +49,7 @@ def _get_trackcrc(meas,is_degree): ...@@ -49,7 +49,7 @@ def _get_trackcrc(meas,is_degree):
for m in meas: for m in meas:
tm = datetime.fromisoformat(m[0]) tm = datetime.fromisoformat(m[0])
t = '%.22s%.2f%.2f' % ( t = '%.22s%.2f%.2f' % (
tm.strftime('%Y-%m-%d %H:%M:%S.%f'), datetime.utcfromtimestamp(round(tm.timestamp(),2)).strftime('%Y-%m-%d %H:%M:%S.%f'),
_hms(float( m[1]/15.0 )) if is_degree else float( m[1] ), # Переводим в часовой угол _hms(float( m[1]/15.0 )) if is_degree else float( m[1] ), # Переводим в часовой угол
_hms(float( m[2] )) if is_degree else float( m[2] ), _hms(float( m[2] )) if is_degree else float( m[2] ),
) )
......
...@@ -256,12 +256,14 @@ class measFile(baseFile): ...@@ -256,12 +256,14 @@ class measFile(baseFile):
await self.execute("select * from template.partition_table('xd.t_trackproc'::regclass,$1)",self.tend) await self.execute("select * from template.partition_table('xd.t_trackproc'::regclass,$1)",self.tend)
await self.execute("select * from template.partition_table('xd.t_binds'::regclass,$1)",self.tbeg) await self.execute("select * from template.partition_table('xd.t_binds'::regclass,$1)",self.tbeg)
await self.execute("select * from template.partition_table('xd.t_binds'::regclass,$1)",self.tend) await self.execute("select * from template.partition_table('xd.t_binds'::regclass,$1)",self.tend)
await self.execute("select * from template.partition_table('xd.t_trackextra'::regclass,$1)",self.tbeg)
await self.execute("select * from template.partition_table('xd.t_trackextra'::regclass,$1)",self.tend)
async def get_db_tracks_crc(self,dev): async def get_db_tracks_crc(self,dev):
_crc = await self.fetchval(""" _crc = await self.fetchval("""
select json_object_agg(crc,track) select json_object_agg(crc,track)
from ( from (
select xd.track_crc(vc,meas) crc,track select xd.track_crc(vc,meas) crc,json_build_object('track',track,'tm',tm) track
from xd.t_tracks from xd.t_tracks
where tm between $1 and $2 where tm between $1 and $2
and utr && tsrange($1,$2,'[]') and utr && tsrange($1,$2,'[]')
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment