I have been working on an MQTT driver for weewx, and have run into an issue 
with the system that I have created.  I am fully aware that the issue that 
I'm facing is with my code, not the weewx engine, but I'm not a programmer 
by training, and am in need of assistance.  

Code posted below, i apologize if it is difficult to follow.  Essentially, 
i create a thread that runs the paho MQTT listener loop.  When a message is 
received, it is put into a queue, and then picked up on the next loop 
packet to be processed by the Weewx engine.  

What is happening, and I'm not sure why, is the thread over the course of 
several days is being launched multiple times.  Today, I had 8 of them 
running, and weewx has been running for 4 days.  Also, weewx has been 
eating memory, when first launched, it takes roughly 250MB, when I reset it 
today, it was using nearly 6GB.

Graphs of the memory usage are available at 
http://carlincomputing.duckdns.org/weewx/system.html.  The drops in memory 
usage are when I restarted the service.

I understand that this is probably outside of the scope of this group, but 
I'm not looking for you to solve my problem for me, but instead to help me 
troubleshoot it.  THis is the first time I have worked with threads, and 
weewx also the only system that I have attempted to modify.

I appreciate your time and recommendations and I'm happy to answer any 
questions that come up,


#!/usr/bin/env python                                                      
                                                                            

import weewx
import syslog
from weewx.engine import StdService
import schemas.wview
import paho.mqtt.client as mqttClient
import time
from Queue import Queue
import threading
import prctl

def logmsg(dst, msg):
    syslog.syslog(dst, 'wxMesh: %s' % msg)

def logdbg(msg):
    logmsg(syslog.LOG_DEBUG, msg)

def loginf(msg):
    logmsg(syslog.LOG_INFO, msg)

def logerr(msg):
    logmsg(syslog.LOG_ERR, msg)

class add_mqtt_records(StdService):

    def __init__(self, engine, config_dict):
        #initialize my superclass first:                                    
                                                                            
      
        super(add_mqtt_records, self).__init__(engine, config_dict)

        # Bind to any new archive record events:                            
                                                                           
        self.bind(weewx.NEW_LOOP_PACKET, self.new_loop_packet)

        self.q = Queue()

        self.thread = threading.Thread(target=self.run, args=())

        self.thread.daemon = True

        try:
           # Dig the needed options out of the configuration dictionary.    
                                                                          
            # If a critical option is missing, an exception will be raised 
and                                                                         
            # the alarm will not be set.                                    
                                                                           
            self.mqtt    = config_dict['MQTT']

#            print "MQTT Data: "                                            
                                                                           
#            print self.mqtt                                                
                                                                           


            for sensor, value in self.mqtt.iteritems():
                self.broker_address = self.mqtt['broker']['address']
                print "Broker address: " + self.broker_address

                self.broker_port = self.mqtt['broker']['port']
                print "Broker address: " + self.broker_address

                self.broker_user = self.mqtt['broker']['user']
                print "Broker user: " + self.broker_user

                self.broker_password = self.mqtt['broker']['password']
                print "Broker password: " + self.broker_password


                self.broker_session_id = self.mqtt['broker']['session_id']
                print "Broker session ID: " + self.broker_session_id

                topics = self.mqtt['topics']
                #print "Topics: "                                          
                                                                            
                #print self.topics                                          
                                                                           

                self.subscriptions = {}
                for sensor, topics in topics.iteritems():
                    #print "Sensor: " + sensor                              
                                                                           

                    #print "Topics: "                                      
                                                                            
                    #print topics                                          
                                                                            
                    for topic, db_entry in topics.iteritems():
                        print "Subscribed to topic: " + topic
                    #    print "DB_Entry: " + db_entry                      
                                                                           
                        self.subscriptions[sensor + topic] = db_entry

           syslog.syslog(syslog.LOG_INFO, "MQTT: Setup for devices")
        except KeyError as e:
            syslog.syslog(syslog.LOG_INFO, "MQTT: Not configured.  Missing 
parameter: %s" % e)

        print "Connecting to Broker"

        self.client = mqttClient.Client() #str(self.broker_session_id), 
clean_session=False)                                                        
   
        self.client.username_pw_set(self.broker_user, 
password=self.broker_password)
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message
        self.client.on_disconnect = self.on_disconnect

        if self.broker_port == "":
            self.client.connect(self.broker_address)

        else:
            self.client.connect(self.broker_address, self.broker_port)

        print "Starting Thread"
        self.thread.start()

    def run(self):

        prctl.set_name("Weewx MQTT listner")

        self.client.loop_start()
        print "Running 'RUN' routine"

    def new_loop_packet(self, event):

        while not self.q.empty():
            message = self.q.get()
            print "Topic: " + message.topic + "\tValue: " + message.payload

            print self.subscriptions[message.topic]
            if message.payload == 'nan':
                message.payload = None

            event.packet[self.subscriptions[message.topic]] = 
float(message.payload)
            #print "Something"                                              
                                                                           

    def on_connect(self, client, userdata, flags, rc):

        if rc == 0:

            print("Connected to broker")
            self.Connected = True

            for topic, db_entry in self.subscriptions.iteritems():
                print ("Subscribing to ", topic)
                self.client.subscribe(topic, 1)

        else:

            print("Connection failed")
            self.Connected = False

    def on_message(self, client, userdata, message):
        #self.data[message.topic] = message.payload                        
                                                                            
        self.q.put(message)
        print "Message " + message.payload + " in topic " + message.topic

    def on_disconnect(self, client, userdata, rc):
        #self.loop_stop()                                                  
                                                                            
        self.Connected = False
 

Many thanks, and happy new year!
Thomas


-- 
You received this message because you are subscribed to the Google Groups 
"weewx-development" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To view this discussion on the web visit 
https://groups.google.com/d/msgid/weewx-development/abe0db12-c4a7-4de3-99e2-e41efdd9a630%40googlegroups.com.

Reply via email to