Lee-W commented on code in PR #66584:
URL: https://github.com/apache/airflow/pull/66584#discussion_r3292205925
##########
airflow-core/docs/authoring-and-scheduling/event-scheduling.rst:
##########
@@ -64,6 +64,128 @@ event-driven scheduling, then a new trigger must be created.
This new trigger must inherit ``BaseEventTrigger`` and ensure it properly
works with event-driven scheduling.
It might inherit from the existing trigger as well if both triggers share some
common code.
+Sharing one poll across sibling triggers
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+.. versionadded:: 3.3
+
+When several ``AssetWatcher`` instances on different assets back triggers that
read from the **same upstream resource**
+— a directory of flag files, a polling REST endpoint, a Kafka topic with
auto-commit, and similar idempotent or
+subscriber-side-effect sources — the triggerer would otherwise spin up one
independent poll loop per trigger. For a
+shared source with twenty subscribers that means twenty poll loops, twenty
connections, twenty sets of API calls per
+cadence. See "Suitable upstreams" below for the precise scope.
+
+``BaseEventTrigger`` supports an opt-in path so that sibling triggers share a
single underlying poll, while each
+trigger keeps its own DB row, its own ``run_trigger`` task, and its own
per-instance filtering. To participate, a
+subclass overrides three hooks:
+
+* :py:meth:`~airflow.triggers.base.BaseEventTrigger.shared_stream_key` —
return a key identifying the shared
+ upstream (typically a tuple of strings). Triggers whose key compares equal
will share one poll. Returning ``None``
+ (the default) opts out — the trigger runs its own independent ``run()``
loop, exactly as before. The return value
+ is read **once** when the triggerer starts this trigger; changing it
mid-lifetime has no effect on group
+ membership, so siblings that should share a poll must return the same key
from the outset.
+
+* :py:meth:`~airflow.triggers.base.BaseEventTrigger.open_shared_stream` — a
``@classmethod`` coroutine the triggerer
+ drives **once per shared-stream group** to yield raw events from the
upstream. Because the triggerer reuses one
+ trigger's kwargs to drive the shared poll, only rely on fields whose values
participate in ``shared_stream_key``.
+
+* :py:meth:`~airflow.triggers.base.BaseEventTrigger.filter_shared_stream` — an
instance method that consumes the
+ broadcast raw stream and yields the ``TriggerEvent`` instances this trigger
should fire. Per-trigger filtering
+ (e.g. only events matching this instance's ``filename``) lives here.
+
+Example: a ``DirectoryFileDeleteTrigger`` that fires when a per-asset flag
file appears in a shared inbox directory:
+
+.. code-block:: python
+
+ from collections.abc import AsyncIterator, Hashable
+ from typing import Any
+
+ from airflow.triggers.base import BaseEventTrigger, TriggerEvent
+
+
+ class DirectoryFileDeleteTrigger(BaseEventTrigger):
+ def __init__(self, *, directory, filename, poke_interval=5.0):
+ super().__init__()
+ self.directory = directory
+ self.filename = filename
+ self.poke_interval = poke_interval
+
+ def shared_stream_key(self) -> Hashable | None:
+ # All triggers on the same directory + cadence share one scan.
+ return ("directory-scan", self.directory, self.poke_interval)
Review Comment:
Added a "key must be deterministic. Thanks!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]