Commit 88125345 authored by Takhir Fakhrutdinov's avatar Takhir Fakhrutdinov

Загрузка метео

parent 470c08ac
[{"header":{"originator":"EOP-2M-1","creation_date":"2022-10-08T10:06:50","message_type":"fci","message_id":"1665223610302-0"},"fci":{"freespace":{"nightid":"20221008","ttl":1200,"meas":{"used":72.92951583862305,"free":3752.9608612060547},"archive":{"used":71856.13441085815,"free":83210.9536819458},"archive_writable":true,"clients":[{"siteid":10341,"used":72.92951583862305,"free":3752.9608612060547},{"siteid":10342,"used":72.92951583862305,"free":3752.9608612060547},{"siteid":10343,"used":72.92951583862305,"free":3752.9608612060547},{"siteid":10344,"used":72.92951583862305,"free":3752.9608612060547}]}}}]
\ No newline at end of file
[{"header":{"creation_date":"2022-10-08T16:49:29.383494","time_system":"UTC","originator":"EOP-2M-1","message_id":"1665247769390-0","comment":"Weather data from Imeteolabs PWS-600","message_type":"meteo"},"meteo":{"metadata":{"station":"Imeteolabs PWS-600","serial_number":"12345678","raw_data":"b'0R1,Dn=011D,Dm=050D,Dx=090D,Sn=000.5M,Sm=001.0M,Sx=002.0M\\r\\n0R2,Ta=013.4C,Ua=083.5P,Pa=000947.9H\\r\\n0R3,Rc=0244.8M,Ri=00000.0M\\r\\n'"},"data":{"date":"2022-10-08T16:49:29.383374","ambient_t":13.4,"ambient_hum":83.5,"pressure":947.9,"wind_dir":50,"wind_speed":1,"precipitation":0,"dew_point":null,"sky_t":null,"precipitation_last_24h":244.8}}}]
\ No newline at end of file
...@@ -155,13 +155,17 @@ def _create_ctleFile(): ...@@ -155,13 +155,17 @@ def _create_ctleFile():
return ctleFile return ctleFile
def _create_cwFile(): def _create_cwFile():
from .tmsdbFile import cwFile from .meteoFile import cwFile
return cwFile return cwFile
def _create_vproFile(): def _create_vproFile():
from .tmsdbFile import vproFile from .meteoFile import vproFile
return vproFile return vproFile
def _create_m2mFile():
from .meteoFile import eop2mFile
return eop2mFile
def _create_reqFile(): def _create_reqFile():
from .reqFile import reqFile from .reqFile import reqFile
return reqFile return reqFile
...@@ -412,3 +416,72 @@ frame = { ...@@ -412,3 +416,72 @@ frame = {
} }
factory.register_file('LCZ', _create_lczFile, schema=frame) factory.register_file('LCZ', _create_lczFile, schema=frame)
meteo = {
"type": "array",
"items": {
"allOf": [
{
"type": "object",
"required": ["header","meteo"],
"properties": {
"header": {
"type": "object",
"required": ["originator","message_type","message_id"],
"properties": {
"originator": {"type": "string"},
"message_type": {"type": "string","enum":["meteo"]},
"message_id": {"type": "string"},
"time_system": {"type": "string","enum":["UTC"]},
"comment": {"type": "string"}
}
},
"meteo":{
"type":"object",
"required":["data","metadata"],
"properties": {
"data":{
"type": "object",
"required": [
"precipitation","date","precipitation_last_24h","pressure","wind_dir","ambient_t",
"sky_t","dew_point","ambient_hum","wind_speed"
],
"properties": {
"date": {"$ref": "#/$defs/timestamp"},
"precipitation": {"type": "number"},
"precipitation_last_24h": {"type": "number"},
"pressure": {"type": "number"},
"ambient_t": {"type": "number"},
"ambient_hum": {"type": "number"},
"wind_dir": {"type": "number"},
"wind_speed": {"type": "number"},
"sky_t": {"$ref": "#/$defs/number"},
"dew_point": {"$ref": "#/$defs/number"}
}
},
"metadata":{
"type": "object",
"required": ["station","serial_number"],
"properties": {
"station": {"type": "string"},
"serial_number": {"type": "string"}
}
}
}
}
}
}
]
},
"$defs": {
"timestamp": {"type" : "string","pattern" : "^(\\d{4})-(\\d{2})-(\\d{2})([T ](\\d{2}):(\\d{2})(:(\\d{2}(?:\\.\\d+)?))?)?$"},
"number":{
"anyOf" : [
{ "type": "number"},
{ "type" : "null"}
]
}
}
}
factory.register_file('M2M', _create_m2mFile, schema=meteo)
...@@ -9,7 +9,7 @@ from datetime import datetime,timedelta ...@@ -9,7 +9,7 @@ from datetime import datetime,timedelta
logger = logging.getLogger('') logger = logging.getLogger('')
class tmsdbFile(baseFile): class meteoBaseFile(baseFile):
""" Создание/обновление БД RRD для метео-устройства. """ Создание/обновление БД RRD для метео-устройства.
""" """
...@@ -49,7 +49,7 @@ class tmsdbFile(baseFile): ...@@ -49,7 +49,7 @@ class tmsdbFile(baseFile):
rounding = (seconds+roundTo/2) // roundTo * roundTo rounding = (seconds+roundTo/2) // roundTo * roundTo
return dt + timedelta(0,rounding-seconds,-dt.microsecond) return dt + timedelta(0,rounding-seconds,-dt.microsecond)
async def get_device(self): async def get_device(self,model,serial_number):
""" Метод. Определение идентификатора устройства. """ Метод. Определение идентификатора устройства.
Return: Return:
...@@ -61,38 +61,73 @@ class tmsdbFile(baseFile): ...@@ -61,38 +61,73 @@ class tmsdbFile(baseFile):
:linenos: :linenos:
:caption: get_device :caption: get_device
""" """
# Определяем идентификатор метеостанции... зашит в имени файла dev = await self.fetchval("""
telno = re.search(r'^\d+',self.args.fname).group(0) select dev
atype = 40003 if self.dbtype == 'met' else 50003 from (
dev = await self.fetchval("select dev from main.t_devattr where atype=$1 and unq and dtr @> 'now'::date and v = $2",atype,telno) select a.dev,l.v model
from main.t_devattr a
join main.t_attrtypelist l using(atype)
where atype in (40002,50002)
and l.rec = (a.v)::integer
) r0 join (
select a.dev,a.v serial_number
from main.t_devattr a
where atype in (40003,50003)
) r1 using(dev)
where model=$1 and serial_number=$2
""",model,serial_number)
if not dev: if not dev:
raise RuntimeError(f"Не удалось идентифицировать метеостанцию {telno}.") raise RuntimeError(f"Не удалось идентифицировать метеостанцию {telno}.")
self.fileattr['dev'] = dev
return dev return dev
async def get_pkeys(self,dev):
""" Метод. Определение списка метеопараметров
def prepare_csv(self,dev,data,f): Return:
""" Метод. Подготовка файла csv для загрузки в БД. (object): Словарь параметров
Args: .. literalinclude:: ../../../../fte/lib/libpy/rrdFile.py
dev (int): идентификатор устройства :language: python
data (list): данные метео устройства :lines: 133,145-155
f (file): afqk csv :linenos:
:caption: get_device
"""
pkeys = await self.fectval("""
select json_object_agg(v,atype)
from main.t_devattr
join main.t_attrtypes using(atype)
where dev = $1
and parent in (42000,52000)
and dtr @> now()::date
""",dev)
return json.loads(keys)
class meteoEopFiles(meteoBaseFile):
""" Создание/обновление БД RRD для метео-устройства.
"""
def prepare_csv(self,f,dev,data,keys):
""" Метод. Запись данных в csv файл
Return:
(object): Словарь параметров
.. literalinclude:: ../../../../fte/lib/libpy/rrdFile.py .. literalinclude:: ../../../../fte/lib/libpy/rrdFile.py
:language: python :language: python
:lines: 178,191-205 :lines: 133,145-155
:linenos: :linenos:
:caption: update_rrd :caption: get_device
""" """
for l in data: for d in data:
dt = self.round_time(datetime.strptime(l['Date'],'%Y-%m-%dT%H:%M:%S')) tm = self.round_time(datetime.fromisoformat(d['Date'])).replace()
# Отсекаем дубликаты... if tm not in self.times:
if dt in self.times: continue self.times.add(tm)
self.times.add(dt)
self.tbeg, self.tend = min(self.tbeg,dt), max(self.tend,dt) self.tbeg, self.tend = min(self.tbeg,dt), max(self.tend,dt)
r = self.mkrec(l,dt,dev) for k,atype in keys.items():
if r: self.write(f,r) self.write(f,(tm,dev,atype,d[k])
async def load(self,fd,args): async def load(self,fd,args):
""" Метод. Загрузка файла. """ Метод. Загрузка файла.
...@@ -105,21 +140,24 @@ class tmsdbFile(baseFile): ...@@ -105,21 +140,24 @@ class tmsdbFile(baseFile):
""" """
logger.info('Загрузка файла...') logger.info('Загрузка файла...')
await super().load(fd,args) await super().load(fd,args)
dev = await self.get_device() serial_number = re.search(r'^\d+',self.args.fname).group(0)
dev = await self.get_device(self.model,serial_number)
await self.check_priv_device(dev) await self.check_priv_device(dev)
self.fileattr['dev'] = dev
pkeys = await self.get_keys(dev)
fo = io.BytesIO() fo = io.BytesIO()
data = json.load(fd)[self.datakey] data = json.load(fd)[model]
self.prepare_csv(dev,data,fo) self.prepare_csv(fo,dev,data,pkeys)
self.fid = await self.getfid() self.fid = await self.getfid()
fo.seek(0) fo.seek(0)
async with self.transaction(): async with self.transaction():
await self.create_doc() await self.create_doc()
await self.copy_to_table(self.table,source=fo,schema_name='xd',delimiter='\t') await self.copy_to_table('t_meteo',source=fo,schema_name='xd',delimiter='\t')
class vproFile(tmsdbFile): class vproFile(meteoEopFile):
""" Загрузка данных устройства Davis Vantage Pro. """ Загрузка данных устройства Davis Vantage Pro.
""" """
...@@ -134,31 +172,10 @@ class vproFile(tmsdbFile): ...@@ -134,31 +172,10 @@ class vproFile(tmsdbFile):
""" """
super().__init__(*args,**kwargs) super().__init__(*args,**kwargs)
self.dbtype = 'met' self.dbtype = 'met'
self.datakey = 'DavisVantage' self.model = 'DavisVantage'
self.table = 't_meteo_davis'
class cwFile(meteoEopFile):
def mkrec(self,l,dt,dev): """ Загрузка данных устройства Davis Vantage Pro.
ikeys = (
"Barometer",
"TInside","InsideHum","InsideDewPoint", # реально 2-х последних полей нет в файле
"TOutside","HumOutside","DewPoint",
"WindSpeed","WSA10min","WindDirection",
"RainRate","StormRain",
"RainDay","RainMonth","RainYear",
# "WSA2min","WindGust10min","WindDirectionWindGust","RainLast15min","RainLastHour","UV",
# "SolarRadiation","ConsoleVoltage","SunRise","SunSet",
)
if isinstance(l["BarTrend"],int):
values = [dt,dev,l["BarTrend"]]
for k in ikeys:
values.append(l.get(k,r'\N'))
return values
else:
logger.warning('BarTrend:"%s" is undefined and skipping...',l['BarTrend'])
class cwFile(tmsdbFile):
""" Загрузка данных устройства Cloud Watcher.
""" """
def __init__(self,*args,**kwargs): def __init__(self,*args,**kwargs):
...@@ -166,30 +183,69 @@ class cwFile(tmsdbFile): ...@@ -166,30 +183,69 @@ class cwFile(tmsdbFile):
.. literalinclude:: ../../../../fte/lib/libpy/rrdFile.py .. literalinclude:: ../../../../fte/lib/libpy/rrdFile.py
:language: python :language: python
:lines: 245,254- :lines: 229,238-240
:linenos: :linenos:
:caption: __init__ :caption: __init__
""" """
super().__init__(*args,**kwargs) super().__init__(*args,**kwargs)
self.dbtype = 'sky' self.dbtype = 'sky'
self.datakey = 'CloudWatcher' self.model = 'CloudWatcher'
self.table = 't_meteo_cloudw'
class eop2mFile(meteoBaseFile):
def mkrec(self,l,dt,dev):
ikeys = ( """ Загрузка данных ЭОП 2М
"TSkyCorrected",
"TSkyRaw", """
"RainFr", def __init__(self,*args,**kwargs):
"LDR", # Видимо BrightnessValue """ Конструктор.
"TAmbient",
"RainHeatingPercent", # Реально этого поля нет в записи .. literalinclude:: ../../../../fte/lib/libpy/rrdFile.py
"TRainSensor", :language: python
"TimeoutErrors", # Реально этого поля нет в записи :lines: 229,238-240
"RHValue", # Реально этого поля нет в записи :linenos:
# "PoverVoltage","SwitchStatus" :caption: __init__
) """
values = [dt,dev] super().__init__(*args,**kwargs)
for k in ikeys: self.dbtype = 'm2m'
values.append(l.get(k) if l.get(k) else r'\N')
return values async def load(self,fd,args):
""" Метод. Загрузка файла.
.. literalinclude:: ../../../../fte/lib/libpy/weaFile.py
:language: python
:lines: 27,36-
:linenos:
:caption: load_data
"""
from collections import namedtuple
logger.info('Загрузка файла...')
await super().load(fd,args)
data = json.load(fd)
fo,devs = io.BytesIO(), set()
for m in data:
r = m.get('meteo')
if r:
model,serial_number = r['metadata']['station'],r['metadata']['serial_number']
dev = await self.get_device(model,serial_number)
await self.check_priv_device(dev)
if dev not in devs:
devs.add(dev)
pkeys = await self.get_pkeys(dev)
tm = datetime.fromisoformat(r['data']['date']).replace(microsecond=0)
self.tbeg, self.tend = min(self.tbeg,tm), max(self.tend,tm)
d = r['data']
rec = [ (tm,dev,atype,d[k],) for k,atype in pkeys.items() if k in d and d.get(k) is not None ]
for l in rec:
self.write(fo,l)
fid = await self.getfid()
self.fileattr = dict(devs=list(devs))
fo.seek(0)
async with self.transaction():
await self.create_doc()
await self.copy_to_table('t_meteo',source=fo,schema_name='xd',delimiter='\t')
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment