Christopher de Vidal wrote: > Help please? Creating an MQTT-to-Firestore bridge and I know a decorator > would help but I'm stumped how to create one. I've used decorators before > but not with arguments. > > The Firestore collection.on_snapshot() method invokes a callback and sends > it three parameters (collection_snapshot, changes, and read_time). I need > the callback to also know the name of the collection so that I can publish > to the equivalent MQTT topic name. I had thought to add a fourth parameter > and I believe a decorator is the right approach but am stumped how to add > that fourth parameter. How would I do this with the code below? > > #!/usr/bin/env python3 > from google.cloud import firestore > import firebase_admin > from firebase_admin import credentials > import json > import mqtt > > firebase_admin.initialize_app(credentials.Certificate("certs/firebase.json")) > db = firestore.Client() > mqtt.connect() > > > def load_json(contents): > try: > return json.loads(contents) > except (json.decoder.JSONDecodeError, TypeError): > return contents > > > def on_snapshot(col_name, col_snapshot, changes, read_time): > data = dict() > for doc in col_snapshot: > serial = doc.id > contents = load_json(doc.to_dict()['value']) > data[serial] = contents > for change in changes: > serial = change.document.id > mqtt_topic = col_name + '/' + serial > contents = data[serial] > if change.type.name in ['ADDED', 'MODIFIED']: > mqtt.publish(mqtt_topic, contents) > elif change.type.name == 'REMOVED': > mqtt.publish(mqtt_topic, None) > > > # Start repeated code section > # TODO Better to use decorators but I was stumped on how to pass arguments > def door_status_on_snapshot(col_snapshot, changes, read_time): > on_snapshot('door_status', col_snapshot, changes, read_time) > > > door_status_col_ref = db.collection('door_status') > door_status_col_watch = > door_status_col_ref.on_snapshot(door_status_on_snapshot) > > # Repetition... > def cpu_temp_on_snapshot(col_snapshot, changes, read_time): > on_snapshot('cpu_temp', col_snapshot, changes, read_time) > > > cpu_temp_col_ref = db.collection('cpu_temp') > cpu_temp_col_watch = cpu_temp_col_ref.on_snapshot(cpu_temp_on_snapshot) > # End repeated code section > > # Start repeated code section > door_status_col_watch.unsubscribe() > cpu_temp_col_watch.unsubscribe() > # Repetition... > # End repeated code section > > Christopher de Vidal
You might also consider a contextmanager: https://docs.python.org/3/library/contextlib.html # untested @contextmanager def subscribe(name, col_snapshot, changes, read_time): def status_on_snapshot(col_snapshot, changes, read_time): on_snapshot(name, col_snapshot, changes, read_time) status_col_ref = db.collection(name) status_col_watch = status_col_ref.on_snapshot(door_status_on_snapshot) try: yield status_col_ref finally: status_col_watch.unsubscribe() with subscribe("door_status", ...) as door_status_col_ref: with subscribe("cpu_temp", ...) as cpu_temp_col_ref: ... If there are many uniform ones the nested with statements can be generalized: NAMES = "door_status", "cpu_temp", ... with ExitStack() as stack: col_refs = [ stack.enter_context(subscribe(name)) for name in NAMES ] And if you like Camoron's suggestion or the subscribe() generator above just gets too unwieldy: a custom class can act as a contextmanager, too. https://docs.python.org/3/reference/compound_stmts.html#with -- https://mail.python.org/mailman/listinfo/python-list