I have been working on an MQTT driver for weewx, and have run into an issue with the system that I have created. I am fully aware that the issue that I'm facing is with my code, not the weewx engine, but I'm not a programmer by training, and am in need of assistance.
Code posted below, i apologize if it is difficult to follow. Essentially, i create a thread that runs the paho MQTT listener loop. When a message is received, it is put into a queue, and then picked up on the next loop packet to be processed by the Weewx engine. What is happening, and I'm not sure why, is the thread over the course of several days is being launched multiple times. Today, I had 8 of them running, and weewx has been running for 4 days. Also, weewx has been eating memory, when first launched, it takes roughly 250MB, when I reset it today, it was using nearly 6GB. Graphs of the memory usage are available at http://carlincomputing.duckdns.org/weewx/system.html. The drops in memory usage are when I restarted the service. I understand that this is probably outside of the scope of this group, but I'm not looking for you to solve my problem for me, but instead to help me troubleshoot it. THis is the first time I have worked with threads, and weewx also the only system that I have attempted to modify. I appreciate your time and recommendations and I'm happy to answer any questions that come up, #!/usr/bin/env python import weewx import syslog from weewx.engine import StdService import schemas.wview import paho.mqtt.client as mqttClient import time from Queue import Queue import threading import prctl def logmsg(dst, msg): syslog.syslog(dst, 'wxMesh: %s' % msg) def logdbg(msg): logmsg(syslog.LOG_DEBUG, msg) def loginf(msg): logmsg(syslog.LOG_INFO, msg) def logerr(msg): logmsg(syslog.LOG_ERR, msg) class add_mqtt_records(StdService): def __init__(self, engine, config_dict): #initialize my superclass first: super(add_mqtt_records, self).__init__(engine, config_dict) # Bind to any new archive record events: self.bind(weewx.NEW_LOOP_PACKET, self.new_loop_packet) self.q = Queue() self.thread = threading.Thread(target=self.run, args=()) self.thread.daemon = True try: # Dig the needed options out of the configuration dictionary. # If a critical option is missing, an exception will be raised and # the alarm will not be set. self.mqtt = config_dict['MQTT'] # print "MQTT Data: " # print self.mqtt for sensor, value in self.mqtt.iteritems(): self.broker_address = self.mqtt['broker']['address'] print "Broker address: " + self.broker_address self.broker_port = self.mqtt['broker']['port'] print "Broker address: " + self.broker_address self.broker_user = self.mqtt['broker']['user'] print "Broker user: " + self.broker_user self.broker_password = self.mqtt['broker']['password'] print "Broker password: " + self.broker_password self.broker_session_id = self.mqtt['broker']['session_id'] print "Broker session ID: " + self.broker_session_id topics = self.mqtt['topics'] #print "Topics: " #print self.topics self.subscriptions = {} for sensor, topics in topics.iteritems(): #print "Sensor: " + sensor #print "Topics: " #print topics for topic, db_entry in topics.iteritems(): print "Subscribed to topic: " + topic # print "DB_Entry: " + db_entry self.subscriptions[sensor + topic] = db_entry syslog.syslog(syslog.LOG_INFO, "MQTT: Setup for devices") except KeyError as e: syslog.syslog(syslog.LOG_INFO, "MQTT: Not configured. Missing parameter: %s" % e) print "Connecting to Broker" self.client = mqttClient.Client() #str(self.broker_session_id), clean_session=False) self.client.username_pw_set(self.broker_user, password=self.broker_password) self.client.on_connect = self.on_connect self.client.on_message = self.on_message self.client.on_disconnect = self.on_disconnect if self.broker_port == "": self.client.connect(self.broker_address) else: self.client.connect(self.broker_address, self.broker_port) print "Starting Thread" self.thread.start() def run(self): prctl.set_name("Weewx MQTT listner") self.client.loop_start() print "Running 'RUN' routine" def new_loop_packet(self, event): while not self.q.empty(): message = self.q.get() print "Topic: " + message.topic + "\tValue: " + message.payload print self.subscriptions[message.topic] if message.payload == 'nan': message.payload = None event.packet[self.subscriptions[message.topic]] = float(message.payload) #print "Something" def on_connect(self, client, userdata, flags, rc): if rc == 0: print("Connected to broker") self.Connected = True for topic, db_entry in self.subscriptions.iteritems(): print ("Subscribing to ", topic) self.client.subscribe(topic, 1) else: print("Connection failed") self.Connected = False def on_message(self, client, userdata, message): #self.data[message.topic] = message.payload self.q.put(message) print "Message " + message.payload + " in topic " + message.topic def on_disconnect(self, client, userdata, rc): #self.loop_stop() self.Connected = False Many thanks, and happy new year! Thomas -- You received this message because you are subscribed to the Google Groups "weewx-development" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To view this discussion on the web visit https://groups.google.com/d/msgid/weewx-development/abe0db12-c4a7-4de3-99e2-e41efdd9a630%40googlegroups.com.
