Until now the Atlas emitted NEW_LOOP_PACKET events about every 7 seconds, 
so I had my indoor weather data arduino (wpa) set to emit at the same 
interval.  I just sped that up to every 3 seconds (just the wpa emitter).  
You can "$ cat /dev/ttyACMwa" which prints the serial data output over usb 
from the arduino (wpa) directly to the linux console.  That shows (the  new 
interval) is 3 seconds.  I will run it over a long time to see if there are 
any long intervals.  But what I suspect is happening is ... I have three 
weather stations.  I built two with Adafruit parts plus the indoors part of 
this one (wpa/Atlas).  The "merged" reports are run as a fourth instance of 
weewx which is not the server, just "wee_reports_merge" which is just 
"wee_report" with the configuration file for the merged report.  The worst 
thing is that this was being run every 90 seconds (DUH).  I slowed it down 
to every 10 minutes.  The archive interval for Atlas/wpa is 5 minutes.  

The hard part was reading the serial port to get the inside data (one line 
of json data).
(1) flush input buffer
(2) read discard first line in case it is partial
(3) read next line or timeout
the readline() commands are blocking with timeout.  Not sure how to do non 
blocking but see this discussion:
https://stackoverflow.com/questions/1093598/pyserial-how-to-read-the-last-line-sent-from-a-serial-device
they all seem to think that at least part of the procedure is blocking (not 
all !)

it works so far.  will leave it to run overnight        service 
attached.     service processes  inside data in NEW_LOOP_PACKET as you 
described.  weewx uses service plus original unmodified weewx-sdr.py driver 
for outside data.

On Sunday, April 30, 2023 at 4:47:56 PM UTC-7 gjr80 wrote:

> On Sunday, 30 April 2023 at 13:42:56 UTC+10 william...@att.net wrote:
>
> Just one question please :-).  Suppose the read of the arduino could 
> possibly take a relatively long time, and you want to have a timeout after 
> which it gives up and saves None/NULL for the indoor data.  
>
> What is the max timeout that would be reasonable relative to the archive 
> interval ?
>
>  
> I would be more concerned about the loop packet interval than the archive 
> interval. Your initial post indicated the Atlas emits packets every 7 
> seconds so the SDR driver will be emitting loop packets every 7 odd 
> seconds. If you have a plain vanilla WeeWX install WeeWX will not be doing 
> much else during the loop packet interval other than calculating derived 
> obs so dwelling for up to, say, a second should have no significant effect. 
>
> What happens if the delay goes past the end of the main archive interval 
> the event you are handling was in?
>
>  
> Delaying past the end of the archive interval will not have a significant 
> impact (within reason). Say your service delays 20 seconds past the end of 
> the archive period, when the driver gets it's turn again it will emit 
> another loop packet which will cause an archive record to be generated by 
> WeeWX and ultimately the report cycle is run maybe 20+ seconds later than 
> usual (note the exact behaviour of the driver is very much driver 
> dependent; some drivers may skip loop packets, others may emit a loop 
> packet immediately and yet others may delay emitting a loop packets - the 
> SDR driver is threaded and I believe it is the former). So really you will 
> probably only noticed delayed report output.
>
> Where you will probably get more problems from delaying the WeeWX main 
> loop is in the generation/processing of loop packets. As mentioned above 
> driver behaviour varies from driver to driver. For example, the vantage 
> driver obtains loop packets every 2 odd seconds; if a loop packet is missed 
> it is gone forever. Other drivers poll the hardware much less frequently, 
> say every 50 odd seconds, in that case there could be an entire minute 
> might go by with no data. The consequences of a missed loop packet depends 
> on the system config. A vantage station with a five minute archive interval 
> would see around 120 loop packets per archive period, so the loss of one 
> loop packet will have no real impact. Consider the second system with loop 
> packets arriving every, say, 50 seconds; if it had an archive interval of 
> one minute, you could conceivably see no loop packets in an archive 
> interval and hence no archive record is generated and no report cycle 
> occurs.
>  
> Remember loop packet data is accumulated by WeeWX and many obs in the 
> resulting archive record are simply the average value of the obs from all 
> loop packets seen during the archive interval. So for slow changing obs, 
> such as air temperature/atmospheric pressure, losing the odd loop packet 
> during an archive interval will have no real impact on the archive record 
> data provided there are numerous other loop packets received in the archive 
> interval. This may not be the case for rapidly changing obs such as wind 
> speed and direction or cumulative obs such as rainfall, in these cases it 
> may be important not to lose loop packets.
>
> One way to deal with sources that have significant latency is to place the 
> code that interacts with the source in it's own thread, this usually 
> entails one or more queues to pass data to the driver/service which adds 
> complexity. The non-threaded approach has simplicity, but risks delaying 
> the WeeWX main loop.
>
>
> I think what is happening is I usually get very fast reads from the 
> arduino but once in a while I get a really slow one.       Could be 
> something independent like me reading the database while weewx is trying to 
> write it?
>
>  
> Possible I guess, if WeeWX was doing some substantial report generation at 
> the time, otherwise not likely. Do the slow response occasions align with 
> WeeWX report generation? What happens if you take WeeWX out of the equation 
> and run a simple python script to poll the Arduino every so many seconds 
> and output the time taken to obtain a response?
>
> Gary
>

-- 
You received this message because you are subscribed to the Google Groups 
"weewx-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to weewx-user+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/weewx-user/aa82960b-de32-4e5a-862f-c86642486c05n%40googlegroups.com.
"""
the previous weewx_atlas.py adafruit version was a
modified weewx-sdr driver with AsyncReader with a queue
which merged stdout from rtl_433 over usb and wpa arduino over usb;
deprecated 4/28/23 at advice of GJR (Gary) on weewx blog;
the original unmodified driver is now used and my stuff was moved to this new service;
the new service has same name as original driver weewx_atlas.py (this file);

the new custom service is AddAtlasInside; see [Engine][[Services]] section at end under
         data_services = user.weewx_atlas.AddAtlasInside,

AddAtlasInside processes NEW_LOOP_PACKET events only;
(A) get_wpa_arduino_data();
    read line from usb serial port and json.loads();
    return packeti which is a json dict of inside weather;
    does not need to contain time or usunits;
(B) new_loop_packet();
    packeto = event.packet.copy();
    packeti = get_wpa_arduino_data();
    a few edits to packeti; 
    event.packet.update(packeti);
    (update is a standard dict() member that merges two dicts);
(C) WeeWX engine accumulates (averages) packets when running callback for NEW_ARCHIVE_RECORD;

GJR comments;

It is important that the function download_total_power() does not delay very long
because it will sit right in the main loop of the WeeWX engine.
If it is going to cause a delay of more than a couple seconds
you might want to put it in a separate thread and
feed the results to AddElectricity through a queue.

Why not run a standard sdr driver to feed WeeWX with loop packets from the Atlas
with a simple, non-threaded data service bound to the new loop packet arrival
to read the indoor data (pressure, temperature and humidity) and augment the loop packet.
Far more modular and easier to test/develop (and get help),
you will be running a standard sdr driver,
and since you are already getting your hands dirty modifying the sdr driver,
writing a small data service to handle the Arduino input should be a walk in the park.
If the Arduino is serially connected to the RPi
you should not have great latency in accessing data,
so a suitably short timeout on the serial reads should provide you with your
indoor data without blocking the main WeeWX thread.
Once proved, the serial read could be moved to a thread if you really had the need.
"""

from __future__ import with_statement
import signal
import sys
import time
from datetime import datetime
import argparse
import logging
import logging.handlers
import serial

import weewx.units
import weecfg
import weeutil.logger
import weeutil.weeutil
from weeutil.weeutil import to_int
from weewx.engine import StdService, StdEngine

try:
    import cjson as json
    setattr(json, 'dumps', json.encode)
    setattr(json, 'loads', json.decode)
except (ImportError, AttributeError):
    try:
        import simplejson as json
    except ImportError:
        import json

log = logging.getLogger(__name__)


SERVICE_VERSION = '0.00'
DEFAULT_CONFIG='/etc/weewx/weewx_atlas.conf'
# thp stands for indoors; temp; humidity; pressure;
# DEFAULT_FNAME_USB_THP = '/dev/ttyACM0'
DEFAULT_FNAME_USB_THP = '/dev/ttyACMwa'
DEFAULT_DEBUG = 1
READ_ATTEMPT_MAX = 70


class AddAtlasInside(StdService):
    """
    service that processes NEW_LOOP_PACKET events;
    (1) get_wpa_arduino_data() returns partial packet of indoor data packeti;
    (2) new_loop_packet() input is packeto (outdoor data packet); update (merge) with packeti;
    """

    def __init__(self, engine, config_dict):
        super().__init__(engine, config_dict)

        # Bind to any weewx.NEW_LOOP_PACKET events
        self.bind(weewx.NEW_LOOP_PACKET, self.new_loop_packet)

        archive_dict = config_dict.get('StdArchive', {})
        self._archive_interval = to_int(archive_dict.get('archive_interval', 300))
        log.debug('_archive_interval = %d', self._archive_interval)

        # e.g. serial usb port == ttyACM0 or ttyACMwa
        sdr_dict = config_dict.get('SDR', {})
        self._fname_usb_thp = sdr_dict.get('fname_usb_thp', DEFAULT_FNAME_USB_THP)
        log.debug('_fname_usb_thp = %s', self._fname_usb_thp)

        # open serial port
        self._fptr_usb_thp = None
        try:
            # timeout = 14
            timeout = 10
            self._fptr_usb_thp = serial.Serial(port=self._fname_usb_thp,
                                              baudrate=115200,
                                              timeout=timeout)
        except (OSError, ValueError) as exc:
            log.debug('startup failed')
            log.debug('exc= %s', str(exc))
            raise weewx.WeeWxIOError(f"failed to start AddAtlasInside; {exc}")

        log.debug('AddAtlasInside startup passed')

    def shutDown(self):
        """close _fptr_usb_thp ttyACM* serial port;"""
        log.debug('AddAtlasInside.shutDown()')
        self._fptr_usb_thp.close()
        log.info('shutdown complete')

    def read_line_or_none(self):
        """readline() timed out unless it returns a non empty line ending with \n;"""
        line = self._fptr_usb_thp.readline()
        if not line:
            return None
        if not line[-1:] == b'\n':
            return None
        return line

    def get_wpa_arduino_data(self):
        """
        (1) read line from arduino; blocking with timeout of serial port;
        (2) parse json;
        (3) return partial packet dict;
        """
        log.debug('----------------------------- get_wpa_arduino_data()')
        # flush input buffer;
        self._fptr_usb_thp.reset_input_buffer()
        # skip up to next \n; may be a partial line;
        skip = self.read_line_or_none()
        if not skip:
            log.error('get_wpa_arduino_data() skip partial line timeout')
            return {}
        # keep the first line that begins with { which is the sentinel for json input;
        done = False
        for attempt in range(READ_ATTEMPT_MAX):
            line = self.read_line_or_none()
            if not line:
                log.error('get_wpa_arduino_data() readline timeout')
                return {}
            if line[0:1] == b'{':
                done = True
                break
        if not done:
            return {}
        line = line.decode()
        try:
            log.debug('after attempt = %d', attempt)
            log.debug('get_wpa_arduino_data() json.loads(line)  line  =%s', str(line))
            packet = json.loads(line)
            log.debug('get_wpa_arduino_data() json.loads passed packet=%s', str(packet))
        except json.JSONDecodeError as exc:
            log.error('get_wpa_arduino_data() JSONDecodeError exc = %s', str(exc))
            log.error('line = %s', line)
            return {}
        return packet

    def new_loop_packet(self, event):
        """
        callback bound to NEW_LOOP_PACKET events;
        (1) just a copy event.packet to packeto for reference; outdoors weather data;
        (2) get_wpa_arduino_data() returns new packeti dictionary of indoors weather data
            which is either complete (100%) or empty;
        (3) make minor changes to packeti;
        (4) update event.packet merging with packeti

        NEW_LOOP_PACKET event.packet should already have these two (key: value) pairs;
            "dateTime": 12345678.12345, "usUnits": 1,
        """
        log.debug('new_loop_packet() begin ===============================================')
        time_packet1 = event.packet['dateTime']
        time_packet2 = datetime.utcfromtimestamp(time_packet1).strftime('%Y-%m-%d %H:%M:%S')
        log.debug('time_packet1=%d time_packet2=%s', time_packet1, time_packet2)
        interval_t1 = weeutil.weeutil.startOfInterval(time_packet1, self._archive_interval)
        interval_t2 = interval_t1 + self._archive_interval
        log.debug('interval_t1=%d interval_t2=%d', interval_t1, interval_t2)

        # inside
        packeti = self.get_wpa_arduino_data()
        # outside
        packeto = event.packet.copy()

        # minor changes to packeti
        # for this specific weather station we assign arbitrary caseTemp1 caseHumid1 cpuTemp1;
        if 'outTemp' in packeto.keys():
            packeti['caseTemp1' ] = packeto['outTemp']
            packeti['cpuTemp1'  ] = packeto['outTemp']
        if 'outHumdity' in packeto.keys():
            packeti['caseHumid1'] = packeto['outHumidity']
        # remove aaa_millis which was just for debugging;
        # you can view output of arduino wpa directly with
        # $ cat /dev/ttyACMwa
        if 'aaa_millis' in packeti.keys():
            del packeti['aaa_millis']

        # event.packet = merge packeti with packeto
        event.packet.update(packeti)

        log.debug('new_loop_packet(); packet inside;        packeti=%s', packeti)
        log.debug('new_loop_packet(); packet outside;       packeto=%s', packeto)
        log.debug('new_loop_packet(); final augmented; event.packet=%s', event.packet)


    # fake test packet
FAKE_PACKET = {
    'dateTime'    : int(time.time()),
    'usUnits'     : weewx.US,
    'windSpeed'   : 50.0,
    'windDir'     : 50.0,
    'outTemp'     : 50.0,
    'outHumidity' : 50.0,
    'lux'         : 50.0,
    'UV'          : 50.0,
    'rain_total'  : 50.0,
    'caseTemp1'   : 50.0,
    'caseHumid1'  : 50.0,
    'cpuTemp1'    : 50.0,
    'signal2'     : 50.0,
    'signal3'     : 50.0,
    'signal4'     : 50.0,
    'signal5'     : 50.0,
    'signal6'     : 50.0,
    'signal7'     : 50.0,
}

def process_options_get_config_dict():
    """
    parse command args and 
    (1) handle simplest options then exit or
    (2) return config_dict and continue
    """

    usage = """weewx_atlas [--debug] [--help] [--version]"""
    print('argparse.ArgumentParser')
    parser = argparse.ArgumentParser(description=usage)
    parser.add_argument('--version',
                        dest='version',
                        action='store_true',
                        help='display driver version')
    parser.add_argument('--debug',
                        dest='debug',
                        default=DEFAULT_DEBUG,
                        action='store_true',
                        help='display diagnostic information while running')
    parser.add_argument('--config',
                        default=DEFAULT_CONFIG,
                        action='store',
                        help='configuration file with sensor map')

    print('parser.parse_args')
    options = parser.parse_args()

    if options.version:
        print(f"weewx_atlas version {SERVICE_VERSION}")
        sys.exit(1)

    if options.debug:
        print('log.setLevel(logging.DEBUG)')
        log.setLevel(logging.DEBUG)

    print(f'{options.config=}')
    log.debug('options.config = %s', str(options.config))
    log.debug('reading config_dict')
    _, config_dict = weecfg.read_config(options.config)

    archive_interval_demo_use = True
    archive_interval_demo = 30     # seconds

    archive_dict = config_dict.get('StdArchive', {})
    archive_interval_in = archive_dict.get('archive_interval', None)
    if archive_interval_demo_use:
        log.debug('using archive_interval_demo; alter config;')
        archive_dict['archive_interval'] = str(archive_interval_demo)
        log.debug('archive_interval_in  = %s', str(archive_interval_in))
        log.debug('archive_interval_out = %s', str(archive_interval_demo))
    else:
        log.debug('using archive interval from options.config')
        log.debug('archive_interval_out = %s', str(archive_interval_in))
    return config_dict


def main():
    """
    test service; not used during weewx run; to run test do;
    PYTHONPATH=$PYTHONPATH:/usr/share/weewx python /usr/share/weewx/user/weewx_atlas.py [OPTIONS]
    """

    # signal handling
    # usb port close is probably already done by station destructor; just in case;
    svc = None

    def testing_handle_interrupt(signum, stack):
        """catch keyboard interrupts; clean up; close usb port;"""
        log.error('Handling interrupt.  Closing port.')
        print(f'{signum=}')
        print(f'{stack=}')
        svc.shutDown()
        log.error('closed port.  exiting.')
        sys.exit(0)

    signal.signal(signal.SIGTERM, testing_handle_interrupt)
    signal.signal(signal.SIGINT, testing_handle_interrupt)

    log.addHandler(logging.StreamHandler(sys.stdout))
    log.addHandler(logging.handlers.SysLogHandler(address="/dev/log"))
    print('log.setLevel INFO')
    log.setLevel(logging.INFO)

    config_dict = process_options_get_config_dict()

    log.debug('StdEngine constructor')
    eng = StdEngine(config_dict)

    log.debug('AddAtlasInside service constructor')
    svc = AddAtlasInside(eng, config_dict)

    while True:
        log.debug('main loop begin %s', 'M' * 70)
        # log.debug('sleeping archive_interval= %d', archive_interval)
        # time.sleep(archive_interval)
        event = weewx.Event(weewx.NEW_LOOP_PACKET)
        event.packet = FAKE_PACKET.copy()
        event.packet['dateTime'] = int(time.time())
        log.debug('begin event.packet = %s', str(event.packet))
        svc.new_loop_packet(event)
        log.debug('end   event.packet = %s', str(event.packet))
        log.debug('main loop end %s', 'M' * 70)


if __name__ == '__main__':
    main()

# eee eof

Reply via email to