Lee-W commented on code in PR #66584:
URL: https://github.com/apache/airflow/pull/66584#discussion_r3266393917
##########
airflow-core/docs/authoring-and-scheduling/event-scheduling.rst:
##########
@@ -64,6 +64,97 @@ event-driven scheduling, then a new trigger must be created.
This new trigger must inherit ``BaseEventTrigger`` and ensure it properly
works with event-driven scheduling.
It might inherit from the existing trigger as well if both triggers share some
common code.
+Sharing one poll across sibling triggers
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+.. versionadded:: 3.3
+
+When several ``AssetWatcher`` instances on different assets back triggers that
read from the **same upstream resource**
+— one SQS queue, one Kafka topic, one directory of flag files — the triggerer
would otherwise spin up one independent
+poll loop per trigger. For a shared queue with twenty subscribers that means
twenty consumers, twenty connections,
+twenty sets of API calls per cadence.
+
+``BaseEventTrigger`` supports an opt-in path so that sibling triggers share a
single underlying poll, while each
+trigger keeps its own DB row, its own ``run_trigger`` task, and its own
per-instance filtering. To participate, a
+subclass overrides three hooks:
+
+* :py:meth:`~airflow.triggers.base.BaseEventTrigger.shared_stream_key` —
return a key identifying the shared
+ upstream (typically a tuple of strings). Triggers whose key compares equal
will share one poll. Returning ``None``
+ (the default) opts out — the trigger runs its own independent ``run()``
loop, exactly as before.
+
+* :py:meth:`~airflow.triggers.base.BaseEventTrigger.open_shared_stream` — a
``@classmethod`` coroutine the triggerer
+ drives **once per shared-stream group** to yield raw events from the
upstream. Because the triggerer reuses one
+ trigger's kwargs to drive the shared poll, only rely on fields whose values
participate in ``shared_stream_key``.
+
+* :py:meth:`~airflow.triggers.base.BaseEventTrigger.filter_shared_stream` — an
instance method that consumes the
+ broadcast raw stream and yields the ``TriggerEvent`` instances this trigger
should fire. Per-trigger filtering
+ (e.g. only events matching this instance's ``filename``) lives here.
+
+Example: a ``DirectoryFileDeleteTrigger`` that fires when a per-asset flag
file appears in a shared inbox directory:
+
+.. code-block:: python
+
+ from airflow.triggers.base import BaseEventTrigger, TriggerEvent
+
+
+ class DirectoryFileDeleteTrigger(BaseEventTrigger):
+ def __init__(self, *, directory, filename, poke_interval=5.0):
+ super().__init__()
+ self.directory = directory
+ self.filename = filename
+ self.poke_interval = poke_interval
+
+ def shared_stream_key(self):
Review Comment:
fixed
##########
airflow-core/docs/authoring-and-scheduling/event-scheduling.rst:
##########
@@ -64,6 +64,97 @@ event-driven scheduling, then a new trigger must be created.
This new trigger must inherit ``BaseEventTrigger`` and ensure it properly
works with event-driven scheduling.
It might inherit from the existing trigger as well if both triggers share some
common code.
+Sharing one poll across sibling triggers
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+.. versionadded:: 3.3
+
+When several ``AssetWatcher`` instances on different assets back triggers that
read from the **same upstream resource**
+— one SQS queue, one Kafka topic, one directory of flag files — the triggerer
would otherwise spin up one independent
+poll loop per trigger. For a shared queue with twenty subscribers that means
twenty consumers, twenty connections,
+twenty sets of API calls per cadence.
+
+``BaseEventTrigger`` supports an opt-in path so that sibling triggers share a
single underlying poll, while each
+trigger keeps its own DB row, its own ``run_trigger`` task, and its own
per-instance filtering. To participate, a
+subclass overrides three hooks:
+
+* :py:meth:`~airflow.triggers.base.BaseEventTrigger.shared_stream_key` —
return a key identifying the shared
+ upstream (typically a tuple of strings). Triggers whose key compares equal
will share one poll. Returning ``None``
+ (the default) opts out — the trigger runs its own independent ``run()``
loop, exactly as before.
+
+* :py:meth:`~airflow.triggers.base.BaseEventTrigger.open_shared_stream` — a
``@classmethod`` coroutine the triggerer
+ drives **once per shared-stream group** to yield raw events from the
upstream. Because the triggerer reuses one
+ trigger's kwargs to drive the shared poll, only rely on fields whose values
participate in ``shared_stream_key``.
+
+* :py:meth:`~airflow.triggers.base.BaseEventTrigger.filter_shared_stream` — an
instance method that consumes the
+ broadcast raw stream and yields the ``TriggerEvent`` instances this trigger
should fire. Per-trigger filtering
+ (e.g. only events matching this instance's ``filename``) lives here.
+
+Example: a ``DirectoryFileDeleteTrigger`` that fires when a per-asset flag
file appears in a shared inbox directory:
+
+.. code-block:: python
+
+ from airflow.triggers.base import BaseEventTrigger, TriggerEvent
+
+
+ class DirectoryFileDeleteTrigger(BaseEventTrigger):
+ def __init__(self, *, directory, filename, poke_interval=5.0):
+ super().__init__()
+ self.directory = directory
+ self.filename = filename
+ self.poke_interval = poke_interval
+
+ def shared_stream_key(self):
+ # All triggers on the same directory + cadence share one scan.
+ return ("directory-scan", self.directory, self.poke_interval)
+
+ @classmethod
+ async def open_shared_stream(cls, kwargs):
+ # Drives one directory listing loop per group.
+ ...
+
+ async def filter_shared_stream(self, shared_stream):
Review Comment:
fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]