Lee-W commented on code in PR #66584:
URL: https://github.com/apache/airflow/pull/66584#discussion_r3292205925


##########
airflow-core/docs/authoring-and-scheduling/event-scheduling.rst:
##########
@@ -64,6 +64,128 @@ event-driven scheduling, then a new trigger must be created.
 This new trigger must inherit ``BaseEventTrigger`` and ensure it properly 
works with event-driven scheduling.
 It might inherit from the existing trigger as well if both triggers share some 
common code.
 
+Sharing one poll across sibling triggers
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+.. versionadded:: 3.3
+
+When several ``AssetWatcher`` instances on different assets back triggers that 
read from the **same upstream resource**
+— a directory of flag files, a polling REST endpoint, a Kafka topic with 
auto-commit, and similar idempotent or
+subscriber-side-effect sources — the triggerer would otherwise spin up one 
independent poll loop per trigger. For a
+shared source with twenty subscribers that means twenty poll loops, twenty 
connections, twenty sets of API calls per
+cadence. See "Suitable upstreams" below for the precise scope.
+
+``BaseEventTrigger`` supports an opt-in path so that sibling triggers share a 
single underlying poll, while each
+trigger keeps its own DB row, its own ``run_trigger`` task, and its own 
per-instance filtering. To participate, a
+subclass overrides three hooks:
+
+* :py:meth:`~airflow.triggers.base.BaseEventTrigger.shared_stream_key` — 
return a key identifying the shared
+  upstream (typically a tuple of strings). Triggers whose key compares equal 
will share one poll. Returning ``None``
+  (the default) opts out — the trigger runs its own independent ``run()`` 
loop, exactly as before. The return value
+  is read **once** when the triggerer starts this trigger; changing it 
mid-lifetime has no effect on group
+  membership, so siblings that should share a poll must return the same key 
from the outset.
+
+* :py:meth:`~airflow.triggers.base.BaseEventTrigger.open_shared_stream` — a 
``@classmethod`` coroutine the triggerer
+  drives **once per shared-stream group** to yield raw events from the 
upstream. Because the triggerer reuses one
+  trigger's kwargs to drive the shared poll, only rely on fields whose values 
participate in ``shared_stream_key``.
+
+* :py:meth:`~airflow.triggers.base.BaseEventTrigger.filter_shared_stream` — an 
instance method that consumes the
+  broadcast raw stream and yields the ``TriggerEvent`` instances this trigger 
should fire. Per-trigger filtering
+  (e.g. only events matching this instance's ``filename``) lives here.
+
+Example: a ``DirectoryFileDeleteTrigger`` that fires when a per-asset flag 
file appears in a shared inbox directory:
+
+.. code-block:: python
+
+    from collections.abc import AsyncIterator, Hashable
+    from typing import Any
+
+    from airflow.triggers.base import BaseEventTrigger, TriggerEvent
+
+
+    class DirectoryFileDeleteTrigger(BaseEventTrigger):
+        def __init__(self, *, directory, filename, poke_interval=5.0):
+            super().__init__()
+            self.directory = directory
+            self.filename = filename
+            self.poke_interval = poke_interval
+
+        def shared_stream_key(self) -> Hashable | None:
+            # All triggers on the same directory + cadence share one scan.
+            return ("directory-scan", self.directory, self.poke_interval)

Review Comment:
   Added a "key must be deterministic. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to