Commit e84b32e1 authored by Takhir Fakhrutdinov's avatar Takhir Fakhrutdinov

новый loader

parent 7c60cee5
__all__ = ['cmn','dblog','leapsec','fileFactory','pgsetup','xFile','mFile','msFile','zFile','weaFile','ephFile','iersFile',
'tleFile','eopFile','tdmFile','gnssFile','tmsdbFile','hwlogFile']
# -*- coding: utf-8 -*-
import logging
import json
from datetime import datetime,timedelta,date,time
from decimal import Decimal
from .msFile import msFile
from .parsers import chirpfile1
from .parsers import chirpfile2
from . import leapsec as lp
import re
import io
import sys
logger = logging.getLogger('')
_2json = lambda obj: (
obj.isoformat()
if isinstance(obj, (datetime,date,time,))
else str(obj) if isinstance(obj, Decimal)
else None
)
class chFile(msFile):
""" Загрузка файла радиоизмерений
"""
def __init__(self):
""" Конструктор.
.. literalinclude:: ../../../../fte/lib/libpy/chFile.py
:language: python
:lines: 27,36-42
:linenos:
:caption: init
"""
super(chFile,self).__init__()
self.dbtype = 'ch'
self.atype = 10010
# Хардкод номера устройства ИРНР
self.tels = [152,]
self.telno = 80002
def load_data(self):
""" Метод. Загрузка файла радио измерений
.. literalinclude:: ../../../../fte/lib/libpy/chFile.py
:language: python
:lines: 43,52-136
:linenos:
:caption: load_data
"""
tcsv = io.StringIO()
csv = dict()
part = lambda x : datetime(x.year,x.month,1)
t_csv= io.StringIO()
f_csv= io.StringIO()
logger.info('Загрузка файла...')
self.create_loghandler()
data = self.parser(*self.get_parser_args())
self.tracks = data['tracks']
self.tbeg = data['tbeg']
self.tend = data['tend']
self.fileattr = dict(tr=data['tr'],ms=data['ms'],)
self.check_privilege_file()
self.check_privilege_device(self.tels[0])
self.remove_loghandler()
# Секционирование новых таблиц
self.partition_xd_tracks()
self.settrackids()
for t in self.tracks:
head = json.dumps(t['hdr'],default=_2json,separators=(',', ':',))
tcsv.write('\t'.join(map(str,(t['id'],self.telno,t['obj'],self.fid,head,)))+'\n')
for m in t['meas']:
self.put_meas(csv,t,m)
# Загрузка в новые таблицы...
self.set_tracks_crc()
for t in self.tracks:
tb,te = t['hdr'].pop('tb'),t['hdr'].pop('te')
utr = '["%s","%s"]' % (tb,te,)
params = t['hdr'].copy()
params['obj'] = t['obj']
crc = self._get_trackcrc(t['meas'])
_loaded = not self.tracks_crc or crc not in self.tracks_crc
if _loaded:
meas = self._get_meas_t(t['meas'])
head = json.dumps(params,default=_2json,separators=(',', ':',))
# Здесь проверка контрольной суммы проводки
# Запись в файл для xd.t_tracks
_id = self.get_trackid() if 'id' not in t else t['id']
t_csv.write('\t'.join(map(str,(_id,te,self.tels[0],self.vc,utr,meas,head)))+'\n')
else:
_id = self.tracks_crc[crc]
# запись в файл для xd.t_filetracks
f_csv.write('\t'.join(map(str,(_id,te,self.fid,_loaded,)))+'\n')
self.state |= 256
tcsv.seek(0)
t_csv.seek(0)
f_csv.seek(0)
#sys.stdout.write(t_csv.read())
#sys.stdout.write(f_csv.read())
#for c in csv:
# csv[c].seek(0)
# sys.stdout.write(csv[c].read())
with self:
self.create_doc()
# Новые таблицы
with self.cursor() as cr:
#cr.copy_from(t_csv,'xd.t_tracks')
#cr.copy_from(f_csv,'xd.t_filetrack',columns=('track','tm','fid','loaded'))
cr.copy_expert('copy xd.t_tracks from stdin',t_csv)
cr.copy_expert('copy xd.t_filetrack(track,tm,fid,loaded) from stdin',f_csv)
with self.cursor() as cr:
#cr.copy_from(tcsv,'main.t_nip_res',columns=('trans_id','nip_id','obj_id','file_id','hdr'))
cr.copy_expert('copy main.t_nip_res(trans_id,nip_id,obj_id,file_id,hdr)) from stdin',tcsv)
for c in csv:
csv[c].seek(0)
cr.callproc('main.partition_meas',(c,))
tbl, = cr.fetchone()
#cr.copy_from(csv[c],tbl)
cr.copy_expert('copy {} from stdin'.format(tbl),csv[c])
if not self.args.archive:
if self.tend > datetime.utcnow()-timedelta(days=1000):
cr.execute("select ctl.create_tasks_seance(%s,true)",(self.fid,))
logger.info('Созадна задача идентификации измерений. Номер файла: %s.',self.fid)
else:
logger.info('Задача идентификации измерений не создаётся. Устаревшие данные.')
class chFileTypeOne(chFile):
""" Загрузка файла радиоизмерений в формате CH1
"""
def __init__(self):
""" Конструктор.
.. literalinclude:: ../../../../fte/lib/libpy/chFile.py
:language: python
:lines: 140,149-152
:linenos:
:caption: init
"""
super(chFileTypeOne,self).__init__()
self.parser = chirpfile1.parsefile
self.vc = 5
def getrec(self,t,m):
""" Метод. Формирование строки для записи в БД
Args:
t (datetime): время измерения
m (dict): параметры измерения
Return:
string: сформированная строка
.. literalinclude:: ../../../../fte/lib/libpy/chFile.py
:language: python
:lines: 153,168-172
:linenos:
:caption: getrec
"""
l = list(m['params'])
L = [0 for x in range(29)]
L[0],L[1],L[2],L[4],L[19] = l # az,el,r,amp,freq
return '\t'.join(map(str,(t['id'],lp.utc2ttm(m['tm']),self.telno,'{%s}' % ','.join(map(str,L)),)))+'\n'
def get_parser_args(self):
""" Метод. Формирование параметров вызова парсера файла
Return:
list: массив параметров
.. literalinclude:: ../../../../fte/lib/libpy/chFile.py
:language: python
:lines: 173,185-193
:linenos:
:caption: get_parser_args
"""
m = re.search(r'\_(?P<dt>\d{8}\_\d{4})\_',self.args.fname)
dt = datetime.strptime(m.group('dt'),'%Y%m%d_%H%M')
with self.cursor() as cr:
cr.execute("select * from ic.devseq_nextval(%(dev)s,'obj',to_char(main.devget_night(%(dev)s,%(dt)s),'YYYYDDD'))",
dict(dev=self.tels[0],dt=dt))
obj, = cr.fetchone()
return (self.f,dt,obj,)
class chFileTypeTwo(chFile):
""" Загрузка файла радиоизмерений в формате CH2
"""
def __init__(self):
""" Конструктор.
.. literalinclude:: ../../../../fte/lib/libpy/chFile.py
:language: python
:lines: 197,206-209
:linenos:
:caption: init
"""
super(chFileTypeTwo,self).__init__()
self.parser = chirpfile2.parsefile
self.vc = 6
def getrec(self,t,m):
""" Метод. Формирование строки для записи в БД
Args:
t (datetime): время измерения
m (dict): параметры измерения
Return:
string: сформированная строка
.. literalinclude:: ../../../../fte/lib/libpy/chFile.py
:language: python
:lines: 210,225-229
:linenos:
:caption: getrec
"""
l = list(m['params'])
L = [0 for x in range(29)]
L[0],L[1],L[2] = l # az,el,r
return '\t'.join(map(str,(t['id'],lp.utc2ttm(m['tm']),self.telno,'{%s}' % ','.join(map(str,L)),)))+'\n'
def get_parser_args(self):
""" Метод. Формирование параметров вызова парсера файла
Return:
list: массив параметров
.. literalinclude:: ../../../../fte/lib/libpy/chFile.py
:language: python
:lines: 230,242-
:linenos:
:caption: get_parser_args
"""
dt = datetime.strptime(re.search(r'[_](?P<dt>\d{8})[_]',self.args.fname).group('dt'),'%Y%m%d')
return (self.f,dt)
# -*- coding: utf-8 -*-
## @package cmn
# Общие функции и глобальные переменные
import re
import json
import decimal
import datetime
_number_patterns = {
re.compile(r"^(?P<s>[-+]?\d+)$"):lambda x : int(x.group('s')),
re.compile(r"^(?P<s>[-+]?(\d+\.(\d*)?|\.\d+)([eE][-+]?\d+)?)$"):lambda x: float(x.group('s')),
}
_datere_ = re.compile(r"^((?P<yy0>\d{4})[-./](?P<m0>\d{2})[-./](?P<d0>\d{2})|(?P<d1>\d{2})[-./](?P<m1>\d{2})[-./](?P<yy1>\d{4}))([T| ](?P<hh>\d{2})\:(?P<mm>\d{2})(\:(?P<ss>\d{2})(\.(?P<ms>\d+))?)?((?P<tzh>[+-]\d{2})\:(?P<tzm>\d{2}))?)?")
def date_parse(value):
""" Функция. Преобразование строки в datetime
Args:
value (str): исходная строка
Return:
(datetime): преобразованное значение
.. literalinclude:: ../../../../fte/lib/libpy/cmn.py
:language: python
:lines: 17,32-44
:linenos:
:caption: date_parse
"""
if type(value) in (unicode,str):
m = _datere_.match(value)
if m:
d = {k:int(v) if v else 0 for k,v in m.groupdict().iteritems()}
d['ms'] = int(m.group('ms').rjust(6,'0')) if m.group('ms') else 0
return datetime.datetime(
d['yy0'] if 'yy0' in d else d['yy1'],
d['m0'] if 'm0' in d else d['m1'],
d['d0'] if 'd0' in d else d['d1'],
d['hh']-d['tzh'],d['mm']-d['tzm'],d['ss'],d['ms']
)
return value
def parse(value):
""" Функция.Преобразование строки в типизированное значение
Args:
value (str): исходная строка
Return:
(any): преобразованное значение
.. literalinclude:: ../../../../fte/lib/libpy/cmn.py
:language: python
:lines: 45,60-69
:linenos:
:caption: parse
"""
if type(value) in (unicode,str):
for p in _number_patterns:
m = p.match(value.strip())
if m: return _number_patterns[p](m)
return date_parse(value)
js_serialize = lambda x: (
x.isoformat() if isinstance(x,(datetime.datetime,datetime.date,datetime.time,))
else str(x) if isinstance(x,decimal.Decimal) else None)
def to_json(data):
""" Функция.Преобразование dict в json
Args:
data (dict): исходный словарь
Return:
строка в формате json
.. literalinclude:: ../../../../fte/lib/libpy/cmn.py
:language: python
:lines: 70,85-
:linenos:
:caption: to_json
"""
return json.dumps(data,default=js_serialize,separators=(',', ':',))
# -*- coding: utf-8 -*-
import logging
import psycopg2 as pg
import locale
import os
# Внимание! пароль worker'a должен быть прописан в файле .pgpass
# соединение использует переменные среды PGHOST,PGDATABASE,PGPORT
user = os.environ.get('PGUSER','worker')
port = os.environ.get('PGPORT','54327')
host = os.environ.get('PGHOST','/tmp')
appname = 'logger'
dsn = 'postgresql://{0}@/ncsat-d?port={2}&host={1}&application_name={3}'.format(user,host,port,appname)
class loggerDbHandler(logging.Handler):
""" Обработчик журнала работы программы.
"""
def __init__(self):
""" Конструктор.
.. literalinclude:: ../../../../fte/lib/libpy/dblog.py
:language: python
:lines: 15,24-29
:linenos:
:caption: __init__
"""
logging.Handler.__init__(self)
self.setFormatter(logging.Formatter(fmt='%(module)s:%(lineno)d:%(message)s'))
self.setLevel(logging.INFO)
self.live = pg.connect(dsn)
self.live.autocommit = True
def __del__(self):
""" Деструктор.
.. literalinclude:: ../../../../fte/lib/libpy/dblog.py
:language: python
:lines: 30,39-41
:linenos:
:caption: __del__
"""
self.live.close()
del self.live
def emit(self,record):
""" Метод. Запись в журнал событий БД
Args:
record (list): параметры сообщения
.. literalinclude:: ../../../../fte/lib/libpy/dblog.py
:language: python
:lines: 42,54-59
:linenos:
:caption: emit
"""
message = self.format(record)
with self.live.cursor() as cr:
cr.callproc("logging",(record.levelno,message,))
if record.levelno > logging.INFO:
cr.callproc('ctl.send_telegram',(message.split('\n'),))
logger = logging.getLogger('')
logger.addHandler(loggerDbHandler())
locale.setlocale(locale.LC_ALL, 'ru_RU.UTF-8')
# -*- coding: utf-8 -*-
from psycopg2.extensions import connection as _connection
import logging
import io
import json
import base64
from datetime import datetime,timedelta
import hashlib as hs
from decimal import Decimal
from .pgsetup import *
from .xFile import xFile
from .mFile import mFile
from .tmFmt import time_format
import sys
logger = logging.getLogger('')
class eopFile(xFile,mFile):
""" Запись измерений ЭОП в БД.
"""
def __init__(self):
""" Конструктор.
.. literalinclude:: ../../../../fte/lib/libpy/eopFile.py
:language: python
:lines: 23,32-44
:linenos:
:caption: __init__
"""
super(eopFile,self).__init__()
self.parser = json.load
self.dbtype = 'eop'
self.tbeg = datetime.max
self.tend = datetime.min
self.tracks = list()
self.tels = list()
self.mcnt = 0
self.vc = 4
self.state = 0
self.log = io.StringIO()
# Формирование строки для типа meas_t
def _get_meas_t(self,meas):
""" Метод. Формирование строки для записи трека в БД
Args:
meas (dict): параметры измерения
Return:
(list): начало трека, окончание трекаб средний блеск, строка для записи трека в БД
.. literalinclude:: ../../../../fte/lib/libpy/eopFile.py
:language: python
:lines: 45,60-76
:linenos:
:caption: _get_meas_t
"""
_fields = ('ra_j2000','dec_j2000','mag','ra_j2000_full_error','dec_j2000_full_error','mag_error',)
ms = list()
mags = list()
tb,te = datetime.max, datetime.min
for m in meas:
tm = datetime.strptime(m['utc'],time_format(m['utc']))
tb,te = min(tb,tm),max(te,tm)
rec = [float(m.get(i,0.0)) for i in (_fields)]
crops = r'\\\\\\\\x%s' % base64.b64decode(m['crops']).hex()
mag = float(m.get('mag',0.0))
if mag != 0: mags.append(mag)
ms.append(r'"(\\"%s\\",\\"%s\\",\\"%s\\")"' % (tm.isoformat(),str(rec).replace('[','{').replace(']','}'),crops,))
avg_m = round(sum(mags)/len(mags),1) if len(mags) else 0.0
return tb,te,avg_m,'{'+','.join(ms)+'}'
def _hms(self,_angle):
""" Метод. Преобразование угла из градусной меры в часовую.
Args:
_angle (float): значение угла в градусах
Return:
(float): значение угла в ЧЧММСС.ссссс
.. literalinclude:: ../../../../fte/lib/libpy/eopFile.py
:language: python
:lines: 77,92-98
:linenos:
:caption: _hms
"""
hh,d = divmod(abs(_angle),1)
sec = d * 3600.0
mm,ss = divmod(sec,60.0)
res = hh*10000+mm*100+round(ss,2)
return res if _angle >= 0 else -res
def _get_trackcrc(self,meas):
""" Метод. Расчёт контрольной суммы трека.
Args:
meas (dict): измерения трека
Return:
(str): контрольная сумма
.. literalinclude:: ../../../../fte/lib/libpy/eopFile.py
:language: python
:lines: 99,114-124
:linenos:
:caption: _get_trackcrc
"""
s = ''
for m in meas:
tm = datetime.strptime(m['utc'],time_format(m['utc']))
t = '%s%.2f%.2f' % (
tm.strftime('%Y-%m-%d %H:%M:%S.%f'),
self._hms(float( m['ra_j2000'])),
self._hms(float( m['dec_j2000'])),
)
s += t
return hs.md5(s.encode('utf8')).hexdigest()
def process_data(self,tracks):
""" Метод. Обработка треков.
Args:
tracks (dict): Треки объектов.
.. literalinclude:: ../../../../fte/lib/libpy/eopFile.py
:language: python
:lines: 125,137-150
:linenos:
:caption: process_data
"""
for t in tracks:
tb,te,avg_m,ms = self._get_meas_t(t['meas'])
self.tbeg = min(self.tbeg,tb)
self.tend = max(self.tend,te)
self.mcnt += len(t['meas'])
# Формируем проводку
self.tracks.append(dict(
tm=te,meas=ms,utr='["%s","%s"]' % (tb,te,),
crc=self._get_trackcrc(t['meas']),
params=dict(
cnt=len(t['meas']),mag=avg_m,obj=t['trackid'],bind=t.get('objectid'),
area=t['target']
)))
def load_data(self):
""" Метод. Загрузка файла измерений.
.. literalinclude:: ../../../../fte/lib/libpy/eopFile.py
:language: python
:lines: 151,160-
:linenos:
:caption: load_data
"""
logger.info('Загрузка файла...')
self.check_privilege_file()
data = self.parser(self.f)
# Формируем измерения
self.process_data(data['tracks'])
# Арибуты файла
self.fileattr = dict(tr=len(self.tracks),ms=self.mcnt)
# Секционирование таблиц
self.partition_xd_tracks()
t_csv= io.StringIO()
f_csv= io.StringIO()
# Загрузка в новые таблицы...
self.set_devid(data['siteid'])
self.check_privilege_device(self.tels[0])
self.set_tracks_crc()
with self:
self.create_doc()
for t in self.tracks:
if not self.tracks_crc or t['crc'] not in self.tracks_crc:
head = json.dumps(t['params'],separators=(',', ':',))
# Здесь проверка контрольной суммы проводки
# Запись в файл для xd.t_tracks
_id = self.get_trackid() if 'id' not in t else t['id']
_loaded = 't' if 'id' not in t else 'f'
t_csv.write('\t'.join(map(str,(_id,t['tm'],self.tels[0],self.vc,t['utr'],t['meas'],head)))+'\n')
else:
_id = self.tracks_crc[t['crc']]
_loaded = 'f'
# запись в файл для xd.t_filetrack
f_csv.write('\t'.join(map(str,(_id,t['tm'],self.fid,_loaded,)))+'\n')
t_csv.seek(0)
f_csv.seek(0)
#sys.stdout.write(t_csv.read())
#sys.stdout.write(f_csv.read())
with self.cursor() as cr:
#cr.copy_from(t_csv,'xd.t_tracks')
#cr.copy_from(f_csv,'xd.t_filetrack',columns=('track','tm','fid','loaded'))
cr.copy_expert('copy xd.t_tracks from stdin',t_csv)
cr.copy_expert('copy xd.t_filetrack(track,tm,fid,loaded) from stdin',f_csv)
# формируем "старые" измерения
with self.cursor() as cr:
cr.callproc('main.partition_meas',(self.tbeg,))
cr.callproc('main.partition_meas',(self.tend,))
cr.execute("""
insert into main.t_nip_res
select track
,main.devget_telno(dev) nip_id
,fid
,null::text
,(params::jsonb||jsonb_build_object('tb',lower(utr),'te',upper(utr)))-array['obj','area','channel'] hdr
,(params->>'obj')::bigint obj_id
from xd.t_tracks t
join xd.t_filetrack f using(track,tm)
where fid = %(fid)s
and t.tm between %(tbeg)s and %(tend)s
and f.tm between %(tbeg)s and %(tend)s
""",dict(fid=self.fid,tbeg=self.tbeg,tend=self.tend))
cr.execute("""
insert into main.t_nip_meas
select track
,time_ttm(ms.tm)
,main.devget_telno(dev) nip_id
,array[
ms[1]
,ms[2]
,ms[4]
,ms[3]
,ms[5]
]::float[]
from xd.t_tracks t
join xd.t_filetrack f using(track,tm)
,unnest(meas) ms
where fid = %(fid)s
and t.tm between %(tbeg)s and %(tend)s
and f.tm between %(tbeg)s and %(tend)s
""",dict(fid=self.fid,tbeg=self.tbeg,tend=self.tend))
if not self.args.archive:
if self.tend > datetime.utcnow()-timedelta(days=45):
cr.execute("select ctl.create_tasks_seance(%s)",(self.fid,))
logger.info('Создана задача идентификации измерений. Номер файла: %s.',self.fid)
else:
logger.info('Задача идентификации измерений не создаётся. Устаревшие данные.')
# -*- coding: utf-8 -*-
import logging
import io
from datetime import datetime,timedelta
from .xFile import xFile
from .parsers import yuma,ctle
logger = logging.getLogger('')
class yumaFile(xFile):
""" Загрузка файлов орбит КО, полученных из внешних источников.
"""
def __init__(self):
""" Конструктор.
.. literalinclude:: ../../../../fte/lib/libpy/ephFile.py
:language: python
:lines: 14,23-25
:linenos:
:caption: __init__
"""
super(yumaFile,self).__init__()
self.parser = yuma.parsefile
def current_constellation(self):
""" Метод. Получение номера соотвествия номеров prn номерам КА в БД.
Args:
sat_system str: система GNSS
Return:
dict: таблица перевода номеров prn
.. literalinclude:: ../../../../fte/lib/libpy/ephFile.py
:language: python
:lines: 26,40-44
:linenos:
:caption: obj2nko
"""
with self:
with self.cursor() as cr:
cr.execute("""
select json_object_agg(coalesce(gps,substr(prn,2)),norad)
from (
select distinct on (prn) prn,norad
from main.t_gnss_const
where satsys = %s
and tmr @> 'now'::timestamp
order by prn,created desc
) r left join main.t_gnss_gps2qzss on (prn = qzss)
""",(self.satSystem,))
r, = cr.fetchone()
return r
def load_data(self):
""" Метод. Загрузка файла орбит КА в БД.
.. literalinclude:: ../../../../fte/lib/libpy/ephFile.py
:language: python
:lines: 45,54-88
:linenos:
:caption: load_data
"""
logger.info('Загрузка файла...')
self.create_loghandler()
self.check_privilege_file()
orblist = self.parser(self.f,self.startDate)
prn2nko = self.current_constellation()
csv = io.StringIO()
self.tbeg = datetime.max
self.tend = datetime.min
for l in orblist:
norad = prn2nko.get(l['prn'])
if norad:
rec = [self.fid,norad,l['bdt'],l['sec'],l['rec'],self.ephtp]
csv.write('\t'.join(map(str,rec))+'\n')
tm = l['bdt']+timedelta(seconds=l['sec'])
self.tbeg = min(self.tbeg,tm)
self.tend = max(self.tend,tm)
else:
logger.warning('Неизвестный объект prn[%s] строка пропускается.',l['prn'])
self.remove_loghandler()
csv.seek(0)
self.fileattr['ko'] = len(orblist)
with self:
self.create_doc()
with self.cursor() as cr:
#cr.copy_from(csv,'main.t_ephemeris',columns=('fid','nko','bdt','sec','params','tp'))
cr.copy_expert("copy main.t_ephemeris(fid,nko,bdt,sec,params,tp) from stdin",csv)
if not self.args.archive and self.task_enabled:
cr.execute("select ctl.create_task(%(taskno)s,%(fid)s,0,%(fid)s::text)",dict(taskno=self.taskno,fid=self.fid))
class gpsAlmFile(yumaFile):
""" Загрузка файлов орбит КА в формате альманаха GPS.
"""
def __init__(self):
""" Конструктор.
.. literalinclude:: ../../../../fte/lib/libpy/ephFile.py
:language: python
:lines: 167,176-184
:linenos:
:caption: __init__
"""
super(gpsAlmFile,self).__init__()
self.satSystem = 'GPS'
self.startDate = datetime(1980,1,6)
self.dbtype = 'Ga'
self.taskno = 53
self.task_enabled = True
self.ephtp = 'Y'
class qzssAlmFile(yumaFile):
""" Загрузка файла орбит КА QZSS.
"""
def __init__(self):
""" Конструктор.
.. literalinclude:: ../../../../fte/lib/libpy/ephFile.py
:language: python
:lines: 190,199-207
:linenos:
:caption: __init__
"""
super(qzssAlmFile,self).__init__()
self.satSystem = 'QZSS'
self.startDate = datetime(1980,1,6)
self.dbtype = 'Ga'
self.taskno = 53
self.task_enabled = True
self.ephtp = 'Y'
class bduAlmFile(yumaFile):
""" Загрузка файла орбит навигационных КА BEDOU.
"""
def __init__(self):
""" Конструктор.
.. literalinclude:: ../../../../fte/lib/libpy/ephFile.py
:language: python
:lines: 143,152-161
:linenos:
:caption: __init__
"""
super(bduAlmFile,self).__init__()
self.satSystem = 'BDS'
self.startDate = datetime(2006,1,1)
self.dbtype = 'Gb'
self.taskno = 55
self.task_enabled = True
self.ephtp = 'B'
class ctleFile(xFile):
""" Загрузка файла орбит КА по данным IRLS в формате TLE.
"""
def __init__(self):
""" Конструктор.
.. literalinclude:: ../../../../fte/lib/libpy/ephFile.py
:language: python
:lines: 121,130-137
:linenos:
:caption: __init__
"""
super(ctleFile,self).__init__()
self.parser = ctle.parsefile
self.dbtype = 'tl'
self.taskno = 45
self.task_enabled = True
self.ephtp = 'S'
def load_data(self):
""" Метод. Загрузка файла орбит КА в БД.
.. literalinclude:: ../../../../fte/lib/libpy/ephFile.py
:language: python
:lines: 45,54-88
:linenos:
:caption: load_data
"""
logger.info('Загрузка файла...')
self.create_loghandler()
self.check_privilege_file()
orblist = self.parser(self.f)
csv = io.StringIO()
self.tbeg = datetime.max
self.tend = datetime.min
for l in orblist:
rec = [self.fid,l['norad'],l['bdt'],l['sec'],l['rec'],self.ephtp]
csv.write('\t'.join(map(str,rec))+'\n')
tm = l['bdt']+timedelta(seconds=l['sec'])
self.tbeg = min(self.tbeg,tm)
self.tend = max(self.tend,tm)
self.remove_loghandler()
csv.seek(0)
self.fileattr['ko'] = len(orblist)
with self:
self.create_doc()
with self.cursor() as cr:
#cr.copy_from(csv,'main.t_ephemeris',columns=('fid','nko','bdt','sec','params','tp'))
cr.copy_expert("copy main.t_ephemeris(fid,nko,bdt,sec,params,tp) from stdin",csv)
if not self.args.archive and self.task_enabled:
cr.execute("select ctl.create_task(%(taskno)s,%(fid)s,0,%(fid)s::text)",dict(taskno=self.taskno,fid=self.fid))
This diff is collapsed.
# -*- coding: utf-8 -*-
import logging
import io
from datetime import datetime,timedelta
from .xFile import xFile
from .parsers import gloephem,gnssconst
logger = logging.getLogger('')
class ephemFile(xFile):
""" Загрузка файлов орбит КО, полученных из внешних источников.
"""
def __init__(self):
""" Конструктор.
.. literalinclude:: ../../../../fte/lib/libpy/ephFile.py
:language: python
:lines: 14,23-25
:linenos:
:caption: __init__
"""
super(ephemFile,self).__init__()
self.dbtype = 'oe'
def load_data(self):
""" Метод. Загрузка файла орбит КА в БД.
.. literalinclude:: ../../../../fte/lib/libpy/ephFile.py
:language: python
:lines: 45,54-88
:linenos:
:caption: load_data
"""
self.parser = gloephem.parsefile
logger.info('Загрузка файла...')
self.create_loghandler()
self.check_privilege_file()
ephem = self.parser(self.f,self.fid)
csv = io.StringIO()
self.tbeg = datetime.max
self.tend = datetime.min
for rec in ephem:
tm = rec[2]
self.tbeg,self.tend = min(self.tbeg,tm),max(self.tend,tm)
csv.write('\t'.join(map(str,rec))+'\n')
self.remove_loghandler()
csv.seek(0)
self.fileattr['ko'] = len(ephem)
with self:
self.create_doc()
with self.cursor() as cr:
cr.copy_expert("copy main.t_gnss_ephem(fid,svn,tm,params,prn) from stdin",csv)
# задачи пока нет
#if not self.args.archive and self.task_enabled:
# cr.execute("select ctl.create_task(%s,%%(fid)s,0,%%(fid)s::text)" % self.taskno,dict(fid=self.fid))
class constFile(xFile):
""" Загрузка файлов орбит КО, полученных из внешних источников.
"""
def __init__(self):
""" Конструктор.
.. literalinclude:: ../../../../fte/lib/libpy/ephFile.py
:language: python
:lines: 14,23-25
:linenos:
:caption: __init__
"""
super(constFile,self).__init__()
self.dbtype = 'gnc'
def load_data(self):
""" Метод. Загрузка файла орбит КА в БД.
.. literalinclude:: ../../../../fte/lib/libpy/ephFile.py
:language: python
:lines: 45,54-88
:linenos:
:caption: load_data
"""
self.parser = gnssconst.parsefile
logger.info('Загрузка файла...')
self.create_loghandler()
self.check_privilege_file()
constelation = self.parser(self.f,self.fid)
csv = io.StringIO()
self.tbeg = datetime.max
self.tend = datetime.min
for r in constelation:
tm = r[5]
self.tbeg,self.tend = min(self.tbeg,tm),max(self.tend,tm)
rng = '["{}",{})'.format(r[5],'"{}"'.format(r[6]) if r[6] else '')
csv.write('\t'.join(map(str,r[:5]+[rng]))+'\n')
self.remove_loghandler()
csv.seek(0)
with self:
self.create_doc()
with self.cursor() as cr:
cr.copy_expert("copy main.t_gnss_const(fid,satsys,svn,norad,prn,tmr) from stdin",csv)
# -*- coding: utf-8 -*-
import logging
import hashlib as hs
from datetime import datetime,timedelta
from .xFile import xFile
import re
import io
import sys
logger = logging.getLogger('')
class hwlogFile(xFile):
months = dict(Jan=1,Feb=2,Mar=3,Apr=4,May=5,Jun=6,Jul=7,Aug=8,Sep=9,Oct=10,Nov=11,Dec=12)
states = dict(standby=2.0,online=3.0,suspend=1.0,offline=0.0)
values = {'open':1.0,'close':0.0,'false':0.0,'true':1.0}
""" Загрузка файла журнала работы ЭОС.
"""
def __init__(self):
""" Конструктор.
.. literalinclude:: ../../../../fte/lib/libpy/hwFile.py
:language: python
:lines: 181,190-195
:linenos:
:caption: __init__
"""
super(hwlogFile,self).__init__()
self.tbeg = datetime.max
self.tend = datetime.min
self.dbtype = 'hwl'
def get_device(self):
""" Метод. Определение идентификатора устройства.
Return:
(int): идентификатор
.. literalinclude:: ../../../../fte/lib/libpy/rrdFile.py
:language: python
:lines: 133,145-155
:linenos:
:caption: get_device
"""
# Определяем идентификатор метеостанции... зашит в имени файла
telno = re.search(r'^\d+',self.args.fname).group(0)
atype = 510007;
with self.cursor() as cr:
cr.execute("select dev from main.t_devattr where atype=%s and unq and dtr @> 'now'::date and v = %s",(atype,telno,))
if not cr.rowcount:
raise RuntimeError('Не удалось идентифицировать КСОЭС ид: %s.' % id)
dev, = cr.fetchone()
self.fileattr['dev'] = dev
return dev
def _gettm(self,dp,now,year):
""" Метод. Загрузка файла.
.. literalinclude:: ../../../../fte/lib/libpy/hwFile.py
:language: python
:lines: 261,270-
:linenos:
:caption: load_data
"""
mn,d,h,m,s = dp.groups()
tm = datetime(year,self.months[mn],int(d),int(h),int(m),int(s))
if (tm-now) > timedelta(days=1):
tm = datetime(year-1,self.months[mn],int(d),int(h),int(m),int(s))
return tm
def load_data(self):
""" Метод. Загрузка файла.
.. literalinclude:: ../../../../fte/lib/libpy/hwFile.py
:language: python
:lines: 261,270-
:linenos:
:caption: load_data
"""
logger.info('Загрузка файла...')
self.create_loghandler()
self.check_privilege_file()
sp = re.compile(r"^(?P<dt>[A-Z][a-z]{2}\s+\d+\s\d{2}:\d{2}:\d{2})\s(?P<cl>.+)\shwstats:\s(?P<tp>\w+)(.(?P<num>\d+))?(\.)(?P<prm>.+)\=(?P<v>.+$)",re.M)
dp = re.compile(r"^(?P<mon>[A-Z][a-z]{2})\s+(?P<d>\d+)\s(?P<h>\d{2}):(?P<m>\d{2}):(?P<s>\d{2})")
dev = self.get_device()
metrix,clis,tps,prms = set(),dict(),dict(),dict()
sql = "select json_object_agg({0}name,{0}) from xd.t_devhardware_type_{0}"
with self.cursor() as cr:
cr.execute(sql.format('cli'))
clis, = cr.fetchone()
if not clis: clis = dict();
cr.execute(sql.format('tp'))
tps, = cr.fetchone()
if not tps: tps = dict();
cr.execute(sql.format('prm'))
prms, = cr.fetchone()
if not prms: prms = dict();
cr.execute("select array_agg(member order by member) from main.t_devmembers where dev = %s",(dev,))
self.tels, = cr.fetchone()
cr.execute("select mrx from xd.t_devhardware_metrix where dev=%s",(dev,))
for l in cr:
metrix.add(l[0])
year = int(re.search(r'[_](\d{4})',self.args.fname).group(1))
now = datetime.utcnow()
csv = io.StringIO()
matches = sp.finditer(self.f.read())
for m in matches:
num = int(m.group('num')) if m.group('num') else 0
tm = self._gettm(dp.match(m.group('dt')),now,year)
self.tbeg = min(self.tbeg,tm)
self.tend = max(self.tend,tm)
cliname,tpname,prmname,val = m.group('cl'),m.group('tp'),m.group('prm'),m.group('v')
if prmname == 'state':v = self.states[val]
elif prmname.startswith('serial'): continue # серийные номера пока не используем...
elif val.lower() in self.values: v = self.values[val.lower()]
else: v = float(val)
# справочники и log diffearable
with self.cursor() as cr:
if cliname not in clis:
cr.execute("insert into xd.t_devhardware_type_cli(cliname) values(%s) on conflict do nothing returning cli",(cliname,))
v, = cr.fetchone()
clis[cliname] = v
logger.info("Добавлен клиент %s",v)
if tpname not in tps:
cr.execute("insert into xd.t_devhardware_type_tp(tpname) values(%s) on conflict do nothing returning tp",(tpname,))
v, = cr.fetchone()
tps[tpname] = v
logger.info("Добавлено устройство %s",v)
if prmname not in prms:
cr.execute("insert into xd.t_devhardware_type_prm(prmname) values(%s) on conflict do nothing returning prm",(prmname,))
v, = cr.fetchone()
prms[prmname] = v
logger.info("Добавлен параметр %s",v)
mrx = hash((dev,clis[cliname],tps[tpname],num,prms[prmname],))
if mrx not in metrix:
cr.execute("insert into xd.t_devhardware_metrix(mrx,dev,cli,tp,num,prm) values(%s,%s,%s,%s,%s,%s) on conflict do nothing",
(mrx,dev,clis[cliname],tps[tpname],num,prms[prmname],))
csv.write('{}\n'.format('\t'.join(map(str,(tm,mrx,v,)))))
csv.seek(0)
self.remove_loghandler()
self.create_doc()
with self.cursor() as cr:
cr.copy_expert('copy xd.t_devhardware_log from stdin',csv)
self.commit()
\ No newline at end of file
# -*- coding: utf-8 -*-
import io
import logging
from datetime import datetime
from .xFile import xFile
from .parsers import leap, iersfinals, eopc04
logger = logging.getLogger('')
class lpFile(xFile):
""" Загрузка значений высокосной секунды по данным IERS.
"""
def __init__(self):
""" Конструктор.
.. literalinclude:: ../../../../fte/lib/libpy/iersFile.py
:language: python
:lines: 14,23-26
:linenos:
:caption: __init__
"""
super(lpFile,self).__init__()
self.parser = leap.parsefile
self.dbtype = 'lp'
def load_data(self):
""" Метод. Загрузка файла.
.. literalinclude:: ../../../../fte/lib/libpy/iersFile.py
:language: python
:lines: 27,36-50
:linenos:
:caption: load_data
"""
logger.info('Загрузка файла...')
self.create_loghandler()
self.check_privilege_file()
leapseconds = self.parser(self.f)
self.remove_loghandler()
with self:
with self.cursor() as cr:
cr.executemany("""
insert into main.t_leapsec values(%s,%s,%s,%s,%s)
on conflict(dt) do nothing""",leapseconds)
cr.execute('refresh materialized view public.v_taiutc')
cr.execute("select min(dt)::timestamp,max(dt)::timestamp from main.t_leapsec")
self.tbeg, self.tend = cr.fetchone()
self.create_doc()
class iersFile(xFile):
""" Загрузка файла о параметрах движения Земли по данным IERS.
"""
def __init__(self):
""" Конструктор.
.. literalinclude:: ../../../../fte/lib/libpy/iersFile.py
:language: python
:lines: 56,65-68
:linenos:
:caption: __init__
"""
super(iersFile,self).__init__()
self.parser = iersfinals.parsefile
self.dbtype = 'iers'
def load_data(self):
""" Метод. Загрузка файла.
.. literalinclude:: ../../../../fte/lib/libpy/iersFile.py
:language: python
:lines: 69,78-97
:linenos:
:caption: load_data
"""
logger.info('Загрузка файла...')
self.create_loghandler()
self.check_privilege_file()
iers = self.parser(self.f)
self.remove_loghandler()
csv = io.StringIO()
for i in iers:
csv.write('%s\t%s\n' % (i[0].isoformat(),i[1],))
csv.seek(0)
with self:
with self.cursor() as cr:
cr.execute("truncate main.t_iers")
#cr.copy_from(csv,'main.t_iers')
cr.copy_expert('copy main.t_iers from stdin',csv)
cr.execute('refresh materialized view public.v_iers')
cr.execute("select min(dt)::timestamp,max(dt)::timestamp from main.t_iers")
self.tbeg, self.tend = cr.fetchone()
self.create_doc()
class solarFile(xFile):
""" Загрузка файла индексов Солнечной активности.
"""
def __init__(self):
""" Конструктор.
.. literalinclude:: ../../../../fte/lib/libpy/iersFile.py
:language: python
:lines: 103,112-114
:linenos:
:caption: __init__
"""
super(solarFile,self).__init__()
self.dbtype = 'si'
def load_data(self):
""" Метод. Загрузка файла.
.. literalinclude:: ../../../../fte/lib/libpy/iersFile.py
:language: python
:lines: 115,124-133
:linenos:
:caption: load_data
"""
logger.info('Загрузка файла...')
self.check_privilege_file()
csv = io.StringIO()
for l in self.f:
l = l.strip('\n')
if len(l):
csv.write('{}\n'.format(l))
csv.seek(0)
with self:
with self.cursor() as cr:
cr.execute("truncate main.t_solarindex")
cr.copy_expert("copy main.t_solarindex(type,dt,f_10_7,f_81,kp) from stdin with delimiter ','",csv)
cr.execute('refresh materialized view public.v_solarindex')
cr.execute("select min(dt)::timestamp,max(dt)::timestamp from main.t_solarindex")
self.tbeg, self.tend = cr.fetchone()
self.create_doc()
class mpcCodeFile(xFile):
""" Загрука международных кодов телескопов.
"""
def __init__(self):
""" Конструктор.
.. literalinclude:: ../../../../fte/lib/libpy/iersFile.py
:language: python
:lines: 139,148-151
:linenos:
:caption: __init__
"""
super(mpcCodeFile,self).__init__()
self.dbtype = 'mcod'
self.tbeg = self.tend = datetime.utcnow()
def load_data(self):
""" Метод. Загрузка файла.
.. literalinclude:: ../../../../fte/lib/libpy/iersFile.py
:language: python
:lines: 152,161-
:linenos:
:caption: load_data
"""
logger.info('Загрузка файла...')
self.check_privilege_file()
with self:
with self.cursor() as cr:
cr.execute("truncate main.t_obscodes")
#cr.copy_from(self.f,'main.t_obscodes',null='')
copy_expert("copy main.t_obscodes from stdin with null ''",self.f)
self.create_doc()
class eopc04File(xFile):
""" Загрузка файла о параметрах движения Земли по данным IERS.
"""
def __init__(self):
""" Конструктор.
.. literalinclude:: ../../../../fte/lib/libpy/iersFile.py
:language: python
:lines: 56,65-68
:linenos:
:caption: __init__
"""
super(eopc04File,self).__init__()
self.parser = eopc04.parsefile
self.dbtype = 'c04'
def load_data(self):
""" Метод. Загрузка файла.
.. literalinclude:: ../../../../fte/lib/libpy/iersFile.py
:language: python
:lines: 69,78-97
:linenos:
:caption: load_data
"""
logger.info('Загрузка файла...')
self.create_loghandler()
self.check_privilege_file()
c04 = self.parser(self.f)
self.remove_loghandler()
csv = io.StringIO()
for l in c04:
csv.write('{}\n'.format('\t'.join(l)))
csv.seek(0)
with self:
with self.cursor() as cr:
cr.execute("truncate main.t_iers_c04")
cr.copy_expert('copy main.t_iers_c04 from stdin',csv)
cr.execute('refresh materialized view public.v_iers_c04')
cr.execute("select min(dt)::timestamp,max(dt)::timestamp from main.t_iers_c04")
self.tbeg, self.tend = cr.fetchone()
self.create_doc()
# -*- coding: utf-8 -*-
from psycopg2.extensions import connection as _connection
import logging
import io
import json
from datetime import datetime,timedelta
from .pgsetup import *
from .xFile import xFile
from .mFile import mFile
from .parsers import lcvfile
#import sys
logger = logging.getLogger('')
class lcvFile(xFile,mFile):
""" Запись измерений ЭОП в БД.
"""
def __init__(self):
""" Конструктор.
.. literalinclude:: ../../../../fte/lib/libpy/lcvFile.py
:language: python
:lines: 20,29-39
:linenos:
:caption: __init__
"""
super(lcvFile,self).__init__()
self.parser = lcvfile.parsefile
self.dbtype = 'lcv'
#self.tbeg = datetime.max
#self.tend = datetime.min
#self.tracks = list()
self.tels = list()
self.vc = 26
self.state = 0
self.log = io.StringIO()
def load_data(self):
""" Метод. Загрузка файла измерений.
.. literalinclude:: ../../../../fte/lib/libpy/lcvFile.py
:language: python
:lines: 40,49-
:linenos:
:caption: load_data
"""
logger.info('Загрузка файла...')
self.check_privilege_file()
data = self.parser(self.f)
# Арибуты файла
self.fileattr = dict(tr=data['tr'],ms=data['ms'])
self.tbeg, self.tend = data['tbeg'], data['tend']
# Секционирование таблиц
self.partition_xd_tracks()
t_csv,f_csv, n_csv = io.StringIO(),io.StringIO(),io.StringIO()
# Загрузка в новые таблицы...
self.set_devid(data['siteid'])
self.check_privilege_device(self.tels[0])
self.set_tracks_crc()
with self:
self.create_doc()
for t in data['tracks']:
if not self.tracks_crc or t['crc'] not in self.tracks_crc:
head = json.dumps(t['prm'],separators=(',', ':',))
_id = self.get_trackid()
_loaded = 't'
t_csv.write('\t'.join(map(str,(_id,t['tm'],self.tels[0],self.vc,t['utr'],t['meas'],head)))+'\n')
n_csv.write('\t'.join(map(str,(_id,t['tm'],t['nci']['utr'],t['nci']['data'])))+'\n')
else:
_id = self.tracks_crc[t['crc']]
_loaded = 'f'
# Проверка, а загружена ли НКИ ?
with self.cursor() as cr:
cr.execute('select 1 from xd.t_tracknci where track=%s and tm=%s',(_id,t['tm'],))
if not cr.rowcount:
n_csv.write('\t'.join(map(str,(_id,t['tm'],t['nci']['utr'],t['nci']['data'])))+'\n')
# запись в файл для xd.t_filetrack
f_csv.write('\t'.join(map(str,(_id,t['tm'],self.fid,_loaded,)))+'\n')
t_csv.seek(0)
f_csv.seek(0)
n_csv.seek(0)
#sys.stdout.write(t_csv.read())
#sys.stdout.write(f_csv.read())
with self.cursor() as cr:
#cr.copy_from(t_csv,'xd.t_tracks')
#cr.copy_from(f_csv,'xd.t_filetrack',columns=('track','tm','fid','loaded'))
cr.copy_expert('copy xd.t_tracks from stdin',t_csv)
cr.copy_expert('copy xd.t_filetrack(track,tm,fid,loaded) from stdin',f_csv)
cr.execute("select * from template.partition_table('xd.t_tracknci'::regclass,%s)",(self.tbeg,))
cr.execute("select * from template.partition_table('xd.t_tracknci'::regclass,%s)",(self.tend,))
#cr.copy_from(n_csv,'xd.t_tracknci')
cr.copy_expert('copy xd.t_tracknci from stdin',n_csv)
# формируем "старые" измерения
with self.cursor() as cr:
cr.callproc('main.partition_meas',(self.tbeg,))
cr.callproc('main.partition_meas',(self.tend,))
cr.execute("""
insert into main.t_nip_res
select track
,main.devget_telno(dev) nip_id
,fid
,null::text
,(params::jsonb||jsonb_build_object('tb',lower(utr),'te',upper(utr)))-array['obj','area','channel'] hdr
,(params->>'obj')::bigint obj_id
from xd.t_tracks t
join xd.t_filetrack f using(track,tm)
where fid = %(fid)s
and loaded
and t.tm between %(tbeg)s and %(tend)s
and f.tm between %(tbeg)s and %(tend)s
returning trans_id
""",dict(fid=self.fid,tbeg=self.tbeg,tend=self.tend))
rc = cr.rowcount
cr.execute("""
insert into main.t_nip_meas
select track
,time_ttm(ms.tm)
,main.devget_telno(dev) nip_id
,array[
_deg(ms[1])
,_deg(ms[2])
,ms[4]
,ms[3]
,ms[5]
]::float[]
from xd.t_tracks t
join xd.t_filetrack f using(track,tm)
,unnest(meas) ms
where fid = %(fid)s
and loaded
and t.tm between %(tbeg)s and %(tend)s
and f.tm between %(tbeg)s and %(tend)s
""",dict(fid=self.fid,tbeg=self.tbeg,tend=self.tend))
if not self.args.archive:
if self.tend > datetime.utcnow()-timedelta(days=45):
if rc>0:
cr.execute("select ctl.create_tasks_seance(%s)",(self.fid,))
logger.info('Создана задача идентификации измерений. Номер файла: %s.',self.fid)
else:
logger.info('Задача идентификации измерений не создаётся. Устаревшие данные.')
# -*- coding: utf-8 -*-
import logging
from datetime import datetime
from datetime import timedelta
import os
import time
import pickle
import certifi
try:
# Python 3
from urllib.request import urlopen
__pickle= '/tmp/leapsec.3.pkl'
except:
# Python 2
from urllib2 import urlopen
__pickle= '/tmp/leapsec.pkl'
logger = logging.getLogger('')
__lock = '/tmp/leapsec.lock'
__tai = timedelta(seconds=32.184000)
__1970_01_01 = datetime(1970,1,1)
def utc2tt(tm):
""" Функция. Вичисление времени TT по заданному времени UTC
Args:
tm (datetime): время UTC
Return:
(datetime): время TT
.. literalinclude:: ../../../../fte/lib/libpy/leapsec.py
:language: python
:lines: 26,41-49
:linenos:
:caption: utc2tt
"""
for t in __taiutc:
if t[0] < tm:
return tm+t[1] + __tai
logger.debug(__taiutc)
raise RuntimeError('Невозможно определить время..."%s"' % tm)
tt2ttm = lambda tm : int((tm-__1970_01_01).total_seconds()*1000000)
utc2ttm = lambda tm : int((utc2tt(tm)-__1970_01_01).total_seconds()*1000000)
def tt2utc(tm):
""" Функция. Вичисление времени UTC по заданному времени TT
Args:
tm (datetime): время TT
Return:
(datetime): время UTC
.. literalinclude:: ../../../../fte/lib/libpy/leapsec.py
:language: python
:lines: 50,65-71
:linenos:
:caption: tt2utc
"""
for t in __taiutc:
if (t[0]+t[1]+__tai) < tm:
return tm - (t[1] + __tai)
raise RuntimeError('Невозможно определить время..."%s"',tm)
ttm2utc = lambda ttm: tt2utc(datetime.utcfromtimestamp(ttm*0.000001))
def mjd2cal(mjd):
""" Функция. Преобразование времени в формате MJD в календарное.
Args:
mjd (float): время в формате MJD
Return:
(datetime): время календарное
.. literalinclude:: ../../../../fte/lib/libpy/leapsec.py
:language: python
:lines: 72,87-90
:linenos:
:caption: mjd2cal
"""
# mjd 40587 - 1 jan 1970
return datetime.utcfromtimestamp((mjd-40587.0)*86400.0)
def __init():
""" Функция. Инициализация таблицы для вычислений ТТ и UTC
Return:
(list): таблица для вычислений
.. literalinclude:: ../../../../fte/lib/libpy/leapsec.py
:language: python
:lines: 91,103-117
:linenos:
:caption: __init
"""
# https не работает 27.04.2018 ????
# https работает 24.11.2018 ????
u = urlopen('https://www.ietf.org/timezones/data/leap-seconds.list',cafile=certifi.where())
#u = urlopen('http://www.ietf.org/timezones/data/leap-seconds.list')
tbl = []
for r in u:
r = r.decode()
if r[0] != '#':
rint = list(map(int,r[:r.index('#')].strip().split('\t')))
# 2208988800 разность мажду NTP timestamp (1900-01-01) и Unix timestamp (1970-01-01)
rint[0] = datetime.utcfromtimestamp(rint[0]-2208988800)
rint[1] = timedelta(seconds=rint[1])
tbl.append(rint)
return sorted(tbl,key=lambda x : x[0],reverse=True)
def __writepicle():
""" Функция. Сохранения таблицы вичислений TT и UTC
.. literalinclude:: ../../../../fte/lib/libpy/leapsec.py
:language: python
:lines: 118,127-139
:linenos:
:caption: __writepicle
"""
logger.debug('write pickle...')
if not os.path.exists(__lock):
logger.debug('lock not exists...')
try:
with open(__lock,'w'):
pass
with open(__pickle,'wb') as f:
pickle.dump(list(__init()),f)
finally:
os.remove(__lock)
logger.info('Файл leapsec.pkl обновлён...')
else:
logger.debug('lock detected...')
# кэшируем обращение по инету
try:
lt = os.path.getmtime(__pickle)
except OSError:
__writepicle()
else:
if (time.time() - lt) > 86400*7:
__writepicle()
while os.path.exists(__lock):
logger.debug('Ждём обновления файла leapsec.pkl...')
time.sleep(5)
with open(__pickle,'rb') as f:
__taiutc = pickle.load(f)
# -*- coding: utf-8 -*-
from psycopg2.extras import DateTimeRange
#import logging
#logger = logging.getLogger('')
class mFile:
""" Базовый класс для загрузки файлов измерений.
"""
def get_trackid(self):
""" Метод. Получение изденификатора трека (проводки).
Return:
(int): идентификатор трека
.. literalinclude:: ../../../../fte/lib/libpy/mFile.py
:language: python
:lines: 9,21-26
:linenos:
:caption: get_trackid
"""
with self.cursor() as cr:
cr.callproc('nextval',('main.t_nip_res_trans_id_seq',))
_id, = cr.fetchone()
return _id
def partition_xd_tracks(self):
""" Метод. Секционирование таблиц для записи измерений в БД.
.. literalinclude:: ../../../../fte/lib/libpy/mFile.py
:language: python
:lines: 27,36-47
:linenos:
:caption: partition_xd_tracks
"""
# Секционирование таблиц
with self:
with self.cursor() as cr:
cr.execute("select * from template.partition_table('xd.t_tracks'::regclass,%s)",(self.tbeg,))
cr.execute("select * from template.partition_table('xd.t_tracks'::regclass,%s)",(self.tend,))
cr.execute("select * from template.partition_table('xd.t_filetrack'::regclass,%s)",(self.tbeg,))
cr.execute("select * from template.partition_table('xd.t_filetrack'::regclass,%s)",(self.tend,))
cr.execute("select * from template.partition_table('xd.t_trackproc'::regclass,%s)",(self.tbeg,))
cr.execute("select * from template.partition_table('xd.t_trackproc'::regclass,%s)",(self.tend,))
cr.execute("select * from template.partition_table('xd.t_binds'::regclass,%s)",(self.tbeg,))
cr.execute("select * from template.partition_table('xd.t_binds'::regclass,%s)",(self.tend,))
def set_tracks_crc(self):
""" Метод. Определение контрольной суммы трека (проводки)
.. literalinclude:: ../../../../fte/lib/libpy/mFile.py
:language: python
:lines: 48,57-68
:linenos:
:caption: set_tracks_crc
"""
#logger.debug('set_track_crc: %s,%s,%s',self.tels[0],self.tbeg,self.tend)
with self.cursor() as cr:
cr.execute("""
select json_object_agg(crc,track)
from (
select xd.track_crc(vc,meas) crc,track
from xd.t_tracks
where tm between %s and %s
and utr && %s
and dev = %s
) r
where crc is not null
""",(self.tbeg,self.tend,DateTimeRange(self.tbeg,self.tend,'[]'),self.tels[0],))
if cr.rowcount:
self.tracks_crc, = cr.fetchone()
def set_devid(self,telno):
""" Метод. Определение идентификатора средства по номеру телеграммы.
Args:
telno (int): номер телеграммы
.. literalinclude:: ../../../../fte/lib/libpy/mFile.py
:language: python
:lines: 69,81-
:linenos:
:caption: set_devid
"""
with self:
with self.cursor() as cr:
cr.callproc("main.devget_id",(telno,self.tbeg.date(),))
dev, = cr.fetchone()
if dev not in self.tels:
if len(self.tels):
raise RuntimeError('Недопустимое количество средств наблюденийв одном файле.')
self.tels.append(dev)
This diff is collapsed.
# -*- coding: utf-8 -*-
import psycopg2 as pg
import psycopg2.extras as pge
import json
from . import cmn
FORECAST_SQL = """
with tpos as(
select round(lat*10) lat,round(lon*10) lon
from main.devpos_llarec(%(telid)s,%(tm)s::date) p
)
select replace(replace(replace(json_build_array(
'n'||coalesce(w.pic_3h,w.pic_6h),
main,
temperature*.1,
pressure*.1,
humidity*.1,
dew_point*.1,
wind_speed*.1,
wind_deg*.1,
main.wind_ix(wind_speed*.1,wind_deg*.1),
cloudiness*.1,
fog*.1,
low_clouds*.1,
middle_clouds*.1,
high_clouds*.1,
coalesce(rain_3h,0)*.1,
coalesce(rain_6h,0)*.1
)::text,'[','{'),']','}'),'"','') wdata,
p.lat,
p.lon
from tpos p
left join main.t_weather w on (
circle(point(w.lat,w.lon),0.1) && circle(point(p.lat,p.lon),0.1)
and w.dt = date_round(%(tm)s,'3h'::interval)
)
left join main.t_weatherpic pi on (pi.id = coalesce(w.pic_3h,w.pic_6h))
limit 1
"""
def _dtmhook(obj):
""" Функция. Преобразование элементов словаря с типом значения строка в datetime.
Args:
obj (dict): Исходный элемент словаря.
Return:
(dict): Преобразованное элемент.
.. literalinclude:: ../../../../fte/lib/libpy/pgsetup.py
:language: python
:lines: 44,60-66
:linenos:
:caption: _dtmhook
"""
for k in obj:
if isinstance(obj[k],str):
dt = cmn.date_parse(obj[k].strip())
if dt != obj[k]:
obj[k] = dt
return obj
class _Json(pge.Json):
""" Класс. Адаптер конвертации *dict* в *json*.
Дополнительно конвертирует типы *date*, *time*, *datetime* и *decimal* по сравнению со стандартным преобразованием.
"""
def dumps(self, obj):
""" Метод. Конвертация экзепляра объекта в строку формата JSON
Args:
obj (dict): Объект
Return:
string: Строка в формате JSON.
.. literalinclude:: ../../../../fte/lib/libpy/pgsetup.py
:language: python
:lines: 71,86-90
:linenos:
:caption: dumps
"""
return json.dumps(
obj,
default = cmn.js_serialize,
separators =(',', ':',)
)
pge.register_default_json(loads=lambda x: json.loads(x, object_hook=_dtmhook))
pge.register_default_jsonb(loads=lambda x: json.loads(x, object_hook=_dtmhook))
pg.extensions.register_adapter(dict, _Json)
pge.register_uuid()
# -*- coding: utf-8 -*-
import logging
import json
from datetime import datetime,timedelta
from .xFile import xFile
logger = logging.getLogger('')
def to_timestamp(value):
""" Преобразование строки c датой в datetime
Args:
value(str): исходное значение
"""
try:
t = value.strip()
t = t if t.find('.') < 0 else t.ljust(26,'0')
return datetime.fromisoformat(t)
except:
return value
class reqFile(xFile):
""" Загрузка файла внешних заданий на наблюдения.
"""
def __init__(self):
""" Конструктор.
.. literalinclude:: ../../../../fte/lib/libpy/reqFile.py
:language: python
:lines: 13,22-28
:linenos:
:caption: __init__
"""
super(reqFile,self).__init__()
self.parser = json.load
self.dbtype = 'req'
self.tbeg = self.tend = datetime.utcnow()
# Конвертируем в формат АЦУК
def to_acukrequest(self,req):
""" Метод. Конвертация во внутренний формаn
Args:
req (dict): исходные данные
Return:
(dict): ковертированные данные
.. literalinclude:: ../../../../fte/lib/libpy/reqFile.py
:language: python
:lines: 29,44-93
:linenos:
:caption: to_acukrequest
"""
_new = {
'ObjectNumber' :'object_number',
'LocalNumber' :'local_number',
'NoradNumber' :'norad_number',
'IntDes' :'object_id',
'Priority' :'priority',
}
def rename(c):
v = dict()
for k in c.keys():
v[_new[k]] = c[k]
return v
header = dict(
creation_date=req['TaskNum']['TaskEpoch'],
task_num=req['TaskNum']['Num'],
time_system="MSK",
shift_boss=req['TaskNum']["ShiftBoss"],
description=req['TaskNum']["TaskDescription"],
)
control = list()
for c in req["Directive"]["Control"]["ObjectInfos"]:
control.append(rename(c))
break_up = list()
for c in req["Directive"]["BreakUp"]["ObjectInfos"]:
break_up.append(rename(c))
deorbit = list()
for c in req["Directive"]["Deorbit"]["ObjectInfos"]:
deorbit.append(rename(c))
condition = list()
for c in req["Directive"]["Condition"]["ObjectInfos"]:
condition.append(rename(c))
approach = list()
for c in req["Directive"]["CollisionApproach"]["Pairs"]:
approach.append(dict(
first_object=rename(c['FirstObject']),
second_object=rename(c['SecondObject']),
approach_epoch=c['CollisionApproachEpoch']
))
return dict(
header=header,
request=dict(
control=control,
collision_approach=approach,
break_up=break_up,
deorbit=deorbit,
condition=condition,
)
)
def load_data(self):
""" Метод. Загрузка файла в БД.
.. literalinclude:: ../../../../fte/lib/libpy/reqFile.py
:language: python
:lines: 95,104-
:linenos:
:caption: load_data
"""
logger.info('Загрузка файла...')
self.check_privilege_file()
request = self.to_acukrequest(self.parser(self.f))
self.fileattr['shift_boss'] = request['header']['shift_boss']
self.fileattr['task_num'] = request['header']['task_num']
# формируем парамеры sql запросов
control_list = ','.join(map(str,[x['norad_number'] for x in request['request']['control']]))
condition_list = ','.join(map(str,[x['norad_number'] for x in request['request']['condition']]))
reqno = str(request['header']['task_num'])
tm_until = to_timestamp(request['header']['creation_date'])+timedelta(hours=21) # с учётом того, что нам передают в зоне MSK
with self:
self.create_doc()
with self.cursor() as cr:
cr.callproc("main.delivery_request_create",(reqno,control_list,tm_until,self.fid,'control',))
cr.callproc("main.delivery_request_create",(reqno,condition_list,tm_until,self.fid,'condition'))
for x in request['request']['collision_approach']:
lst = '{},{}'.format(x['first_object']['norad_number'],x['second_object']['norad_number'])
to_epoch = to_timestamp(x['approach_epoch'])
cr.callproc("main.delivery_request_create",(reqno,lst,to_epoch,self.fid,'collision_approach',True,))
This diff is collapsed.
# -*- coding: utf-8 -*-
import logging
import io
from datetime import datetime,timedelta
from .xFile import xFile
from .parsers import satcat,tle
from . import leapsec as lp
logger = logging.getLogger('')
def _epoch2tm(ep):
""" Функция. Преобразование TLE epoch в datetime
Args:
ep (float): время в формате TLE
Return:
(datetime): время
.. literalinclude:: ../../../../fte/lib/libpy/tleFile.py
:language: python
:lines: 11,26-30
:linenos:
:caption: _epoch2tm
"""
yd = int(ep)
yd += 100000 * (20 if yd < 57000 else 19)
sec = timedelta(seconds=round(round(ep % 1,8)*86400.0,6))
return datetime.strptime('%s' % yd,'%Y%j')+sec
class tleFile(xFile):
""" Запись в БД СО в формате TLE с сайта spacetrack.org.
"""
def __init__(self):
""" Конструктор.
.. literalinclude:: ../../../../fte/lib/libpy/tleFile.py
:language: python
:lines: 35,44-49
:linenos:
:caption: __init__
"""
super(tleFile,self).__init__()
self.parser = tle.parsefile
self.dbtype = 'tle'
self.tbeg = datetime.max
self.tend = datetime.min
def getids(self,olen):
""" Метод. Получение идентификаторов орбит из БД.
Args:
olen (int): количество идентификаторов
Return:
(list): массив идентификаторов
.. literalinclude:: ../../../../fte/lib/libpy/tleFile.py
:language: python
:lines: 50,65-70
:linenos:
:caption: getids
"""
with self:
with self.cursor() as cr:
cr.execute("select nextval('tle.t_tle_id_seq'::regclass) from generate_series(1,%s)",(olen,))
ids = [x[0] for x in cr]
return ids
def load_data(self):
""" Метод. Загрузка файла в БД.
.. literalinclude:: ../../../../fte/lib/libpy/tleFile.py
:language: python
:lines: 71,80-137
:linenos:
:caption: load_data
"""
part = lambda x : datetime(x.year,x.month,1)
logger.info('Загрузка файла...')
self.create_loghandler()
self.check_privilege_file()
orblist = self.parser(self.f)
self.remove_loghandler()
if len(orblist):
ids = self.getids(len(orblist))
csv = dict()
for i,l in enumerate(orblist):
tm = _epoch2tm(l['ep'])
ttm = str(lp.utc2ttm(tm))
p = part(tm)
if p not in csv:
csv[p] = io.StringIO()
csv[p].write('\t'.join([str(ids[i]),ttm,l['num'],'1',l['l1'],l['l2']])+'\n')
self.tbeg = min(self.tbeg,tm)
self.tend = max(self.tend,tm)
#for c in csv:
# with open('%s.tle.txt' % c,'w') as f:
# csv[c].seek(0)
# f.write(csv[c].read())
self.fileattr['ko'] = len(orblist)
with self:
self.create_doc()
with self.cursor() as cr:
for c in csv:
csv[c].seek(0)
cr.callproc('tle.partition_tle',(c,))
tbl, = cr.fetchone()
#cr.copy_from(csv[c],tbl)
cr.copy_expert('copy {} from stdin'.format(tbl),csv[c])
# создаем задачу конвертации орбит...
cr.execute("insert into ctl.t_pendingtask(tp,icp) select 1,json_build_object('orbits',%s,'type','tle')",
(','.join(map(str,ids)),))
logger.info('Создана задача конвертации (%s) орбит TLE в J2000...',len(ids))
# создаем задачу отправки сообщений TC2LE
cr.execute("""
select ctl.create_tasks_mail(
4302
,_attach=>json_build_array(
json_build_object(
'fname',format('TC2LE_%%s_%%s.tc',to_char('now'::timestamp,'YYMMDD_HH24MISS'),77),
'data',array_agg(trim(l) order by id,substr(l,1,1))
)
)
)
from (
select id,unnest(array[tline1,tline2])l
from tle.t_tle
where id = any(%s)
) r
""",(ids,))
logger.info('Создана задача отправки сообщений TC2LE ОЭК ОКМ...')
class satcatFile(xFile):
""" Запись в БД списка КО с сайта spacetrack.org.
"""
def __init__(self):
""" Конструктор.
.. literalinclude:: ../../../../fte/lib/libpy/tleFile.py
:language: python
:lines: 142,151-155
:linenos:
:caption: __init__
"""
super(satcatFile,self).__init__()
self.parser = satcat.parsefile
self.dbtype = 'sat'
self.tbeg = self.tend = datetime.utcnow()
def load_data(self):
""" Метод. Загрузка файла.
.. literalinclude:: ../../../../fte/lib/libpy/tleFile.py
:language: python
:lines: 156,165-
:linenos:
:caption: load_data
"""
part = lambda x : datetime(x.year,x.month,1)
logger.info('Загрузка файла...')
self.create_loghandler()
norads, catlist = self.parser(self.f)
self.remove_loghandler()
self.fileattr['ko'] = len(catlist)
with self:
self.create_doc()
with self.cursor() as cr:
cr.executemany("""
insert into tle.t_satcat(intldes,norad_cat_id,object_type,satname,country,launch,site,decay,period,inclination,apogee,perigee,comment,commentcode,rcsvalue,launch_year,launch_num,launch_piece)
values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
on conflict(norad_cat_id) do
update set
(intldes,object_type,satname,country,launch,site,decay,period,inclination,apogee,perigee,comment,commentcode,rcsvalue,launch_year,launch_num,launch_piece)
=
(excluded.intldes,excluded.object_type,excluded.satname,excluded.country,excluded.launch,excluded.site,excluded.decay,excluded.period,excluded.inclination,excluded.apogee,excluded.perigee,excluded.comment,excluded.commentcode,excluded.rcsvalue,excluded.launch_year,excluded.launch_num,excluded.launch_piece)
""",catlist)
logger.info('Изменена информация по %s объектам...',cr.rowcount)
cr.callproc("tle.check_satcat",(norads,))
# новый справочник КО
cr.execute("select main.obj_check_satcatrec(s.*) from main.v_satcat_changed s")
cr.execute("set work_mem to '256MB'")
cr.execute("refresh materialized view public.web_objects")
import re
_formats = {
re.compile(r'^\d{4}\-\d{2}\-\d{2}[T]\d{2}\:\d{2}\:\d{2}\.\d+$'):'%Y-%m-%dT%H:%M:%S.%f',
re.compile(r'^\d{4}\-\d{2}\-\d{2}\s\d{2}\:\d{2}\:\d{2}\.\d+$'):'%Y-%m-%d %H:%M:%S.%f',
re.compile(r'^\d{4}\-\d{2}\-\d{2}[T]\d{2}\:\d{2}\:\d{2}$'):'%Y-%m-%dT%H:%M:%S',
re.compile(r'^\d{4}\-\d{2}\-\d{2}\s\d{2}\:\d{2}\:\d{2}$'):'%Y-%m-%d %H:%M:%S',
re.compile(r'^\d{2}\/\d{2}\/\d{4}\s\d{2}\:\d{2}\:\d{2}\.\d+$'):'%d/%m/%Y %H:%M:%S.%f',
re.compile(r'^\d{2}\/\d{2}\/\d{4}\s\d{2}\:\d{2}\:\d{2}$'):'%d/%m/%Y %H:%M:%S',
}
def time_format(s):
""" Функция. Определение формата даты для функции datetime.srtptime
Args:
s (str): строка
.. literalinclude:: ../../../../fte/lib/libpy/tmFmt.py
:language: python
:lines: 12,24-
:linenos:
:caption: time_format
"""
for r in _formats:
m = r.search(s)
if m: return _formats[r]
raise RuntimeError('unknown datetime format for %s',s)
# -*- coding: utf-8 -*-
import logging
import json
import re
from io import StringIO
from datetime import datetime,timedelta
from .xFile import xFile
logger = logging.getLogger('')
class tmsdbFile(xFile):
""" Создание/обновление БД RRD для метео-устройства.
"""
def __init__(self):
""" Конструктор.
.. literalinclude:: ../../../../fte/lib/libpy/rrdFile.py
:language: python
:lines: 97,106-108
:linenos:
:caption: __init__
"""
super(tmsdbFile,self).__init__()
self.tbeg, self.tend, self.times = datetime.max,datetime.min,set()
def round_time(self,dt=None, dateDelta=timedelta(minutes=1)):
""" Метод. Округление даты и времени до...
Args:
dt (datetime): исходное время
dateDelta (timedelta): единица округления
Return:
(datetime): округлённое значение
.. literalinclude:: ../../../../fte/lib/libpy/rrdFile.py
:language: python
:lines: 109,125-132
:linenos:
:caption: round_time
"""
roundTo = dateDelta.total_seconds()
if dt == None : dt = datetime.now()
seconds = (dt - dt.min).seconds
# // is a floor division, not a comment on following line:
rounding = (seconds+roundTo/2) // roundTo * roundTo
return dt + timedelta(0,rounding-seconds,-dt.microsecond)
def get_device(self):
""" Метод. Определение идентификатора устройства.
Return:
(int): идентификатор
.. literalinclude:: ../../../../fte/lib/libpy/rrdFile.py
:language: python
:lines: 133,145-155
:linenos:
:caption: get_device
"""
# Определяем идентификатор метеостанции... зашит в имени файла
telno = re.search(r'^\d+',self.args.fname).group(0)
atype = 40003 if self.dbtype == 'met' else 50003
with self.cursor() as cr:
cr.execute("select dev from main.t_devattr where atype=%s and unq and dtr @> 'now'::date and v = %s",(atype,telno,))
if not cr.rowcount:
raise RuntimeError('Не удалось идентифицировать метеостанцию %s.' % id)
dev, = cr.fetchone()
self.fileattr['dev'] = dev
return dev
def prepare_csv(self,dev,data,f):
""" Метод. Подготовка файла csv для загрузки в БД.
Args:
dev (int): идентификатор устройства
data (list): данные метео устройства
f (file): afqk csv
.. literalinclude:: ../../../../fte/lib/libpy/rrdFile.py
:language: python
:lines: 178,191-205
:linenos:
:caption: update_rrd
"""
for l in data:
dt = self.round_time(datetime.strptime(l['Date'],'%Y-%m-%dT%H:%M:%S'))
# Отсекаем дубликаты...
if dt in self.times: continue
self.times.add(dt)
self.tbeg, self.tend = min(self.tbeg,dt), max(self.tend,dt)
r = self.mkrec(l,dt,dev)
if r: f.write(r)
def load_data(self):
""" Метод. Загрузка метеоданных в файл RRD.
.. literalinclude:: ../../../../fte/lib/libpy/rrdFile.py
:language: python
:lines: 206,215-223
:linenos:
:caption: load_data
"""
logger.info('Загрузка файла...')
self.check_privilege_file()
dev = self.get_device()
self.check_privilege_device(dev)
data = json.load(self.f)[self.datakey]
self.create_loghandler()
f = StringIO()
with self:
self.prepare_csv(dev,data,f)
f.seek(0)
with self.cursor() as cr:
cr.copy_expert('copy {} from stdin'.format(self.table),f)
self.create_doc()
self.remove_loghandler()
class vproFile(tmsdbFile):
""" Загрузка данных устройства Davis Vantage Pro.
"""
def __init__(self):
""" Конструктор.
.. literalinclude:: ../../../../fte/lib/libpy/rrdFile.py
:language: python
:lines: 229,238-240
:linenos:
:caption: __init__
"""
super(vproFile,self).__init__()
self.dbtype = 'met'
self.datakey = 'DavisVantage'
self.table = 'xd.t_meteo_davis'
def mkrec(self,l,dt,dev):
ikeys = (
"Barometer",
"TInside","InsideHum","InsideDewPoint", # реально 2-х последних полей нет в файле
"TOutside","HumOutside","DewPoint",
"WindSpeed","WSA10min","WindDirection",
"RainRate","StormRain",
"RainDay","RainMonth","RainYear",
# "WSA2min","WindGust10min","WindDirectionWindGust","RainLast15min","RainLastHour","UV",
# "SolarRadiation","ConsoleVoltage","SunRise","SunSet",
)
if isinstance(l["BarTrend"],int):
values = [dt,dev,l["BarTrend"]]
for k in ikeys:
values.append(l.get(k,r'\N'))
return '{}\n'.format('\t'.join(map(str,values)))
else:
logger.warning('BarTrend:"%s" is undefined and skipping...',l['BarTrend'])
class cwFile(tmsdbFile):
""" Загрузка данных устройства Cloud Watcher.
"""
def __init__(self):
""" Конструктор.
.. literalinclude:: ../../../../fte/lib/libpy/rrdFile.py
:language: python
:lines: 245,254-
:linenos:
:caption: __init__
"""
super(cwFile,self).__init__()
self.dbtype = 'sky'
self.datakey = 'CloudWatcher'
self.table = 'xd.t_meteo_cloudw'
def mkrec(self,l,dt,dev):
ikeys = (
"TSkyCorrected",
"TSkyRaw",
"RainFr",
"LDR", # Видимо BrightnessValue
"TAmbient",
"RainHeatingPercent", # Реально этого поля нет в записи
"TRainSensor",
"TimeoutErrors", # Реально этого поля нет в записи
"RHValue", # Реально этого поля нет в записи
# "PoverVoltage","SwitchStatus"
)
values = [dt,dev]
for k in ikeys:
values.append(l.get(k) if l.get(k) else r'\N')
return '{}\n'.format('\t'.join(map(str,values)))
# -*- coding: utf-8 -*-
import io
import logging
from datetime import datetime
from .xFile import xFile
logger = logging.getLogger('')
class forecastFile(xFile):
""" Зарузка файла прогноза метеоусловий наблюдений.
"""
def __init__(self):
""" Конструктор.
.. literalinclude:: ../../../../fte/lib/libpy/weaFile.py
:language: python
:lines: 14,23-26
:linenos:
:caption: __init__
"""
super(forecastFile,self).__init__()
self.dbtype = 'wf'
self.tbeg = self.tend = datetime.utcnow()
def load_data(self):
""" Метод. Загрузка файла.
.. literalinclude:: ../../../../fte/lib/libpy/weaFile.py
:language: python
:lines: 27,36-
:linenos:
:caption: load_data
"""
logger.info('Загрузка файла...')
self.check_privilege_file()
self.f.seek(0)
with self:
with self.cursor() as cr:
#cr.copy_from(self.f,'main.t_weather_load',sep=',',columns=(
# 'dt', 'lat', 'lon', 'alt', 'pressure', 'temperature', 'humidity',
# 'dew_point', 'wind_speed', 'wind_deg', 'cloudiness', 'fog', 'low_clouds',
# 'middle_clouds', 'high_clouds', 'rain_3h', 'pic_3h', 'rain_6h', 'pic_6h',
#))
cr.copy_expert("""
copy main.t_weather_load(
dt, lat, lon, alt, pressure, temperature, humidity,
dew_point, wind_speed, wind_deg, cloudiness, fog, low_clouds,
middle_clouds, high_clouds, rain_3h, pic_3h, rain_6h, pic_6h
) from stdin with delimiter ','
""",self.f)
cr.execute("select min(dt),max(dt) from main.t_weather_load")
self.tbeg, self.tend = cr.fetchone()
cr.execute("""
insert into main.t_weather
select * from main.t_weather_load
on conflict on constraint t_weather_pk do
update set
(pressure,temperature,humidity,dew_point,wind_speed,wind_deg,
cloudiness,fog,low_clouds,middle_clouds,high_clouds,
rain_3h,pic_3h,rain_6h,pic_6h,created)
=
(excluded.pressure,excluded.temperature,excluded.humidity,excluded.dew_point,excluded.wind_speed,excluded.wind_deg,
excluded.cloudiness,excluded.fog,excluded.low_clouds,excluded.middle_clouds,excluded.high_clouds,
excluded.rain_3h,excluded.pic_3h,excluded.rain_6h,excluded.pic_6h,now()::timestamp)
""")
self.create_doc()
with self.cursor() as cr:
cr.execute("truncate main.t_weather_load")
\ No newline at end of file
This diff is collapsed.
# -*- coding: utf-8 -*-
import logging
import io
from datetime import datetime,date,time
from decimal import Decimal
import json
from .xFile import xFile
from .parsers import zorbfile,zrptfile
logger = logging.getLogger('')
_2json = lambda obj: (
obj.isoformat()
if isinstance(obj, (datetime,date,time,))
else str(obj) if isinstance(obj, Decimal)
else None
)
class zoFile(xFile):
""" Загрузка файла со списком орбит КО из внешнего источника.
"""
def __init__(self):
""" Конструктор.
.. literalinclude:: ../../../../fte/lib/libpy/zFile.py
:language: python
:lines: 24,33-35
:linenos:
:caption: __init__
"""
super(zoFile,self).__init__()
self.parser = zorbfile.parsefile
def _check_objects(self):
""" Метод. Корректировка справочников КО.
.. literalinclude:: ../../../../fte/lib/libpy/zFile.py
:language: python
:lines: 36,45-70
:linenos:
:caption: _check_objects
"""
# ищем предыдущий загруженный файл...
with self.cursor() as cr:
cr.execute("""
select p.fid,lower(p.utr),lower(c.utr)
from main.t_files p
join main.t_files c on (c.ftype = p.ftype and c.tm > p.tm )
where c.fid = %(fid)s
order by p.tm desc
limit 1;
""",dict(fid=self.fid))
if cr.rowcount:
pfid,ptm,tm = cr.fetchone()
if ptm > tm:
logger.debug('Нечего делать, существуют более поздние файлы ...')
return
# проверяем объекты снятые/поставленные на сопровождение и изменение номеров ИПМ
cr.callproc("main.check_exobj",(self.fid,pfid,))
# и существенные изменения в элементах орбит
cr.callproc("main.check_changedexobj",(self.fid,pfid,))
# новый справочник 08.04.2022
# obj_check_exorbits_die вызывается из pg_cron
cr.callproc('main.obj_check_exorbits_file',(self.fid,tm,))
cr.execute("set work_mem to '256MB'")
cr.execute("refresh materialized view public.web_objects")
def load_data(self):
""" Метод. Загрузка файла.
.. literalinclude:: ../../../../fte/lib/libpy/zFile.py
:language: python
:lines: 71,80-123
:linenos:
:caption: load_data
"""
dbt = dict(ALL='za',GEO='zg',HEO='zv',LEO='zn',CU='cu',PUBLIC='zo')
logger.info('Загрузка файла...')
self.create_loghandler()
otype,tm,orblist = self.parser(self.f)
self.remove_loghandler()
self.dbtype = dbt[otype]
self.check_privilege_file()
self.tbeg = self.tend = tm if tm else datetime.utcnow()
self.fileattr['ko'] = len(orblist)
part = set()
csv = io.StringIO()
for no,l in enumerate(orblist):
if l['tm'].year < 2004:
logger.warning('Incorrect data line: %s tm:%s',no,l['tm'])
continue
rec = '\t'.join(map(str,(self.fid,l['unko'],l['tm'],l['nko'],l['vc'],)))+'\n'
csv.write(rec)
p = l['tm'].year*10 + (l['tm'].month/4 + 1)
if p not in part:
with self:
with self.cursor() as cr:
cr.execute("select template.partition_table('main.t_exorbits'::regclass,%s)",(l['tm'],))
part.add(p)
csv.seek(0)
with self:
self.create_doc()
with self.cursor() as cr:
#cr.copy_from(csv,'main.t_exorbits')
cr.copy_expert('copy main.t_exorbits from stdin',csv)
if not self.args.archive:
# запуск задач конвертации орбит КО во внутренний формат
cr.execute("select ctl.create_tasks_zo_translate(%(fid)s)",dict(fid=self.fid))
logger.info('Создана задача конвертации для %s объектов...',len(orblist))
# рассылка СО (заблокированно 20.05.2016) # разблокировано для 128.128 13.10.2021
#cr.execute("select * from ctl.create_tasks_sm_zorbits(%(classid)s,%(fid)s)",dict(classid=self.acl_classid,fid=self.fid))
logger.info('Создана задача доставки почтовых сообщений документа номер:%s ...',self.fid)
if self.dbtype != 'cu': self._check_objects()
# Орбиты для автономной идентификации
if self.dbtype in ('zo','zn','cu'):
with self.cursor() as cr:
vc = 34 if self.dbtype == 'zn' else 17
if self.dbtype != 'cu':
cr.execute("delete from xd.t_statevc_fastident where vc=%s",(vc,))
else:
cr.execute("select main.obj_check_exorbitsrec(r.*,%s) from main.t_exorbits r where fid=%s",
(self.dbtype,self.fid,))
cr.execute("""
insert into xd.t_statevc_fastident(vc,koid,norad,kiam,unko,tm,params)
select %s vc,koid,norad,kiam,unko,tm,
vc[1:18]::float[]
||regexp_match(vc[19],'(\d+)\((\d+\.\d+)\)')::float[]
||vc[20]::float
||date_part('epoch',to_timestamp(vc[21],'DDMMYYYY HH24MISS')::timestamp)
||date_part('epoch',to_timestamp(vc[22],'DDMMYYYY HH24MISS')::timestamp)
||vc[23]::float
from (
select mp3.koid,r.unko,o2.nko::integer norad,o4.nko::integer kiam,r.tm,to_array(r.vc)::text[] vc
from main.t_exorbits r
join main.t_object o on (r.unko::text = o.nko and o.dir=3)
join main.t_objmap mp3 on (mp3.target = o.koid and mp3.tmr @> r.tm)
left join main.t_objmap mp2 on (mp2.koid = mp3.koid and mp2.tmr @> r.tm and mp2.dir=2)
left join main.t_object o2 on (o2.koid = mp2.target)
left join main.t_objmap mp4 on (mp4.koid = mp3.koid and mp4.tmr @> r.tm and mp4.dir=4)
left join main.t_object o4 on (o4.koid = mp4.target)
where fid = %s
) r
on conflict (koid,vc) do
update set (norad,kiam,unko,tm,params) =
(excluded.norad,excluded.kiam,excluded.unko,excluded.tm,excluded.params)
""",(vc,self.fid,))
if self.dbtype == 'zn':
# обновляем группу 206 для отправки орбит в формате ЗНХ
cr.execute("""
update main.t_objgroup
set members = r.mb
from (
select array_agg(distinct koid) mb
from main.t_exorbits x
join main.t_object o on (x.nko = o.nko::integer and o.dir=4)
where fid=%s
) r
where gro=206
""",(self.fid,))
class zrFile(xFile):
""" Загрузка файла отчёта по сеансу наблюдений из внешних источников.
"""
def __init__(self):
""" Конструктор.
.. literalinclude:: ../../../../fte/lib/libpy/zFile.py
:language: python
:lines: 129,138-143
:linenos:
:caption: __init__
"""
super(zrFile,self).__init__()
self.parser = zrptfile.parsefile
self.dbtype = 'zr'
self.atype = [20020,500020]
def load_data(self):
""" Метод. Загрука файла.
.. literalinclude:: ../../../../fte/lib/libpy/zFile.py
:language: python
:lines: 144,153-
:linenos:
:caption: load_data
"""
logger.info('Загрузка файла...')
self.create_loghandler()
tels, self.tbeg, self.tend, binds, self.fileattr = self.parser(self.f)
telset = set()
self.check_privilege_file()
for t in tels:
telid = self.get_dev(t,self.tbeg.date())
self.check_privilege_device(telid)
telset.add(telid)
self.remove_loghandler()
self.tels = list(telset)
csv = io.StringIO()
for b in binds:
rec = (self.fid,b,'{%s}' % ','.join(map(str,binds[b]['tracks'])),json.dumps(binds[b]['params'],default=_2json,separators=(',', ':',)),)
csv.write('\t'.join(map(str,rec))+'\n')
csv.seek(0)
with self:
with self.cursor() as cr:
cr.execute("""update main.t_files set state = state | 512
where ftype in ('om','tc','mp') and tels && %s and utr && tsrange(%s,%s,'[]')
and utr != tsrange(null,null)""",
(self.tels,self.tbeg,self.tend,))
self.state = 0 if cr.rowcount else 8
self.create_doc()
with self.cursor() as cr:
#cr.copy_from(csv,'main.t_exreport',columns=('fid','kiamno','nipno','params',))
cr.copy_expert('copy main.t_exreport(fid,kiamno,nipno,params) from stdin',csv)
# рассылка отчётов
#cr.callproc("main.process_report",(self.fid,))
cr.execute("select * from ctl.create_tasks_mail(1020,%s,%s)",(self.tels[0],str(self.fid),))
#!/usr/local/bin/python3
# -*- coding: utf-8 -*-
from libpy.fileFactory import factory,detect_file
from libpy.fileFactory import factory
import sys
import os
import io
......@@ -9,45 +9,52 @@ import re
import json
from collections import namedtuple
import logging
import psycopg2 as pg
import aiofiles
import asyncpg as pg
import uvloop
_BASEPATH_ = os.environ.get('LOADER_DIR','/usr/home/worker/data/')
logger = logging.getLogger('')
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
if __name__ == '__main__':
workdir = '/tmp/monitor/'
user = 'worker'
port = 54327
#host = '/tmp'
host = '10.255.128.128'
appname = 'loader'
async def patch_davisfile(fin,fname):
async with aiofiles.open(_BASEPATH_+fname,mode='r') as f:
async for l in f:
l = l.replace(
'"\\"', '-1').replace(
'"\\ "','-1').replace(
'"-"', '0').replace(
'"/"', '1').replace(
'"^"', '2').replace(
'"|"', '-2')
if l == ']}{"DavisVantage":[\n': l = ',\n'
fin.write(l)
fin.seek(0)
import logging.config
# load config from file
# logging.config.fileConfig('logging.ini', disable_existing_loggers=False)
# or, for dictConfig
logging.config.dictConfig({
'version': 1,
'disable_existing_loggers': False, # this fixes the problem
'formatters': {
'standard': {
'format': '%(asctime)s %(levelname)s:%(module)s:%(lineno)s: %(message)s',
'datefmt':'%Y-%m-%d %H:%M:%S'
},
},
'handlers': {
'default': {
'level':'DEBUG',
'formatter':'standard',
'class':'logging.StreamHandler',
},
},
'loggers': {
'': {
'handlers': ['default'],
'level': 'DEBUG',
'propagate': True
}
}
})
from libpy.dblog import *
logger = logging.getLogger('')
async def patch_cwfile(fin,fname):
async with aiofiles.open(_BASEPATH_+fname,mode='r') as f:
async for l in f:
if l == ']}{"CloudWatcher":[\n': l = ',\n'
fin.write(l)
fin.seek(0)
async def read_file(fin,fname):
async with aiofiles.open(_BASEPATH_+fname,mode='r') as f:
d = await f.read()
fin.write(d)
fin.seek(0)
async def main():
try:
logger.info("Обработка электронной почты. Старт...")
logger.debug("build date:")
......@@ -69,46 +76,34 @@ if __name__ == '__main__':
logger.debug(js)
# Для корректного завершения задачи.
print(js['fname'])
fin = io.StringIO()
with open(_BASEPATH_+fullname) as f:
# Гвозь: исправление ошибки в файле метеоданных
if re.search(r'\d+[_]\d+[_]weather[.]log',fullname):
for l in f:
l = l.replace(
'"\\"', '-1').replace(
'"\\ "','-1').replace(
'"-"', '0').replace(
'"/"', '1').replace(
'"^"', '2').replace(
'"|"', '-2')
if l == ']}{"DavisVantage":[\n': l = ',\n'
fin.write(l)
await read_davisfile(fin,fullname)
elif re.search(r'\d+[_]\d+[_]sky[.]log',fullname):
for l in f:
if l == ']}{"CloudWatcher":[\n': l = ',\n'
fin.write(l)
await read_cwfile(fin,fullname)
else:
fin.write(f.read())
await read_file(fin,fullname)
ftype = detect_file(fin)
ftype = factory.detect_file(fin)
if ftype:
logger.info('Определён тип файла %s',ftype)
doc = factory.create(ftype)
cn = await pg.connect(dsn,connection_class=doc)
try:
args = namedtuple('Generic',js.keys())(**js)
doc.set_file(fin,args)
doc.load_data()
logger.info('Обработка электронной почты завершена.')
await cn.load(fin,args)
except:
message = f"Ошибка загрузки файла {fn}"
async with cn.transaction():
await cn.execute("select * from dbq.put_task_sendtelegram($1)",[message])
logger.exception(message)
finally:
await cn.close()
else:
logger.warning('Тип файла %s не определён.',fn)
logger.info('Обработка электронной почты завершена.')
except pg.IntegrityError as e:
doc.rollback()
if e.pgerror.find('t_files_crc_unq') > 0:
logger.warning('Файл %s уже загружен',fn)
else:
logger.exception('Ошибка обработки электронной почты.')
except:
if 'doc' in locals() and doc:
doc.rollback()
logger.exception('Ошибка обработки электронной почты.')
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment