This is an automated email from the ASF dual-hosted git repository.

Lee-W pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 3d2a635c6ed Add shared_stream_cohort_grace_period to reduce missed 
events on triggerer restart (#68888)
3d2a635c6ed is described below

commit 3d2a635c6edc7c0228b08d895a55fca947c98a7e
Author: Wei Lee <[email protected]>
AuthorDate: Wed Jun 24 13:46:11 2026 +0800

    Add shared_stream_cohort_grace_period to reduce missed events on triggerer 
restart (#68888)
---
 .../authoring-and-scheduling/event-scheduling.rst  | 12 ++++
 airflow-core/newsfragments/68888.feature.rst       |  1 +
 .../src/airflow/config_templates/config.yml        | 13 ++++
 .../src/airflow/jobs/triggerer_job_runner.py       |  1 +
 airflow-core/src/airflow/triggers/shared_stream.py | 30 +++++++++
 airflow-core/tests/unit/jobs/test_triggerer_job.py |  6 ++
 .../tests/unit/triggers/test_shared_stream.py      | 75 ++++++++++++++++++++++
 7 files changed, 138 insertions(+)

diff --git a/airflow-core/docs/authoring-and-scheduling/event-scheduling.rst 
b/airflow-core/docs/authoring-and-scheduling/event-scheduling.rst
index b60b39cf821..693632f6a75 100644
--- a/airflow-core/docs/authoring-and-scheduling/event-scheduling.rst
+++ b/airflow-core/docs/authoring-and-scheduling/event-scheduling.rst
@@ -410,6 +410,18 @@ only if the subscriber needs to run cleanup before failing.
 triggerer restart, the broker redelivers messages that were never
 advanced. Subscribers must therefore be idempotent.
 
+When multiple triggers sharing the same key restart together, the first
+to re-subscribe creates a fresh group and polling starts immediately.
+Triggers that re-subscribe later join as late subscribers (outside the
+snapshot of any already-broadcast event), so they may miss events
+committed in the window between the first subscription and their own.
+Set ``[triggerer] shared_stream_cohort_grace_period`` to a positive
+number of seconds (e.g. ``2.0``) to delay the start of polling after a
+new group is created, giving concurrent re-subscriptions time to join
+before any event is broadcast. This is a best-effort window — it reduces
+but does not eliminate the risk of a slow-rejoining trigger missing
+events.
+
 **Durability**: the broker advance is gated on persistence. A subscriber's
 resolution completes only after every ``TriggerEvent`` it derived from the
 event has been stored in the metadata database; the confirmation reaches
diff --git a/airflow-core/newsfragments/68888.feature.rst 
b/airflow-core/newsfragments/68888.feature.rst
new file mode 100644
index 00000000000..456f9907e9d
--- /dev/null
+++ b/airflow-core/newsfragments/68888.feature.rst
@@ -0,0 +1 @@
+Add ``[triggerer] shared_stream_cohort_grace_period`` to delay the start of 
polling after a shared-stream group is created
diff --git a/airflow-core/src/airflow/config_templates/config.yml 
b/airflow-core/src/airflow/config_templates/config.yml
index a8ce5506c53..29763f422fe 100644
--- a/airflow-core/src/airflow/config_templates/config.yml
+++ b/airflow-core/src/airflow/config_templates/config.yml
@@ -2978,6 +2978,19 @@ triggerer:
       type: float
       example: ~
       default: "300.0"
+    shared_stream_cohort_grace_period:
+      description: |
+        Seconds to delay the start of polling after a shared-stream group is 
created, giving triggers
+        that share the same key a window to subscribe before any event is 
broadcast. The default of 0
+        starts polling immediately (no delay). Set to a small positive value 
(e.g. 2.0–5.0) to reduce
+        the chance of triggers missing events on triggerer restart, when 
multiple triggers sharing a key
+        re-subscribe concurrently and the first to arrive would otherwise 
start the poll before the
+        others have joined. This is a best-effort window: triggers that take 
longer than the grace period
+        to re-subscribe can still miss events committed after polling starts.
+      version_added: 3.3.0
+      type: float
+      example: ~
+      default: "0.0"
 kerberos:
   description: ~
   options:
diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py 
b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
index b32ea53a2cd..b29978e52a9 100644
--- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py
+++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
@@ -1135,6 +1135,7 @@ class TriggerRunner:
             log=self.log,
             max_subscriber_queue=conf.getint("triggerer", 
"shared_stream_subscriber_queue_size"),
             ack_timeout=conf.getfloat("triggerer", 
"shared_stream_ack_timeout"),
+            cohort_grace_period=conf.getfloat("triggerer", 
"shared_stream_cohort_grace_period"),
         )
         self.blocked_main_thread_warning_threshold = conf.getfloat(
             "triggerer", "blocked_main_thread_warning_threshold"
diff --git a/airflow-core/src/airflow/triggers/shared_stream.py 
b/airflow-core/src/airflow/triggers/shared_stream.py
index 93ccc3e80da..4b8ae16c8fb 100644
--- a/airflow-core/src/airflow/triggers/shared_stream.py
+++ b/airflow-core/src/airflow/triggers/shared_stream.py
@@ -110,6 +110,18 @@ when a group stops while events are still awaiting persist 
confirmation
 event): the pending advances are abandoned and the broker redelivers
 those events.
 
+When multiple triggers sharing the same key restart together, the first
+to re-subscribe creates a fresh group and polling begins immediately.
+Triggers that re-subscribe later join as ordinary late subscribers (not
+counted in the snapshot of earlier events), so they may miss events
+committed during the window between the first subscription and their own.
+Set ``[triggerer] shared_stream_cohort_grace_period`` to a positive
+number of seconds to delay the start of polling after a group is created,
+giving concurrent re-subscriptions time to join before any event is
+broadcast. This is a best-effort window: triggers that take longer than
+the grace period to re-subscribe still miss events committed after
+polling starts.
+
 **``shared_stream_subscriber_queue_size`` in ack mode**: the bound is
 still "unprocessed raw events per subscriber". The manager does **not**
 wait for outstanding resolutions before pulling the next event from the
@@ -508,6 +520,7 @@ class _SharedStreamGroup:
         on_poll_terminate: Callable[[_SharedStreamGroup], None],
         max_subscriber_queue: int,
         ack_timeout: float,
+        cohort_grace_period: float = 0.0,
         log: BoundLogger,
         _now: Callable[[], float] = time.monotonic,
     ) -> None:
@@ -518,6 +531,7 @@ class _SharedStreamGroup:
         self._on_poll_terminate = on_poll_terminate
         self._max_subscriber_queue = max_subscriber_queue
         self._ack_timeout = ack_timeout
+        self._cohort_grace_period = cohort_grace_period
         self._now = _now
         self._subscribers: dict[int, asyncio.Queue] = {}
         # Subscribers already force-failed (queue overflow or ack timeout);
@@ -685,6 +699,8 @@ class _SharedStreamGroup:
         producer: SharedStreamProducer | None = None
         terminal_exc: BaseException | None = None
         try:
+            if self._cohort_grace_period > 0:
+                await asyncio.sleep(self._cohort_grace_period)
             if ack_required:
                 # A factory failure flows through the terminal broadcast
                 # path below, like any other poll failure.
@@ -1161,6 +1177,17 @@ class SharedStreamManager:
 
     The manager is single-event-loop and not thread-safe. The triggerer's
     ``TriggerRunner`` is its sole owner.
+
+    :param log: Bound logger; defaults to the module logger.
+    :param max_subscriber_queue: Per-subscriber buffer size. A subscriber 
whose queue is full
+        is force-failed rather than blocking the poll loop.
+    :param ack_timeout: Per-event ack timeout in seconds. A subscriber that 
has not finished
+        processing an event within this window is force-failed via 
:class:`AckTimeout`.
+    :param cohort_grace_period: Seconds to delay the start of polling after a 
new group is
+        created. When > 0, the poll loop sleeps for this duration before 
calling
+        ``open_stream``/``open_shared_stream``, giving triggers that share the 
same key a window
+        to subscribe before any event is broadcast — useful on triggerer 
restart where concurrent
+        re-subscriptions would otherwise race against the first poll. Default 
0 (no delay).
     """
 
     def __init__(
@@ -1169,11 +1196,13 @@ class SharedStreamManager:
         log: BoundLogger | None = None,
         max_subscriber_queue: int = DEFAULT_SUBSCRIBER_QUEUE_MAX,
         ack_timeout: float = DEFAULT_ACK_TIMEOUT,
+        cohort_grace_period: float = 0.0,
         _now: Callable[[], float] = time.monotonic,
     ) -> None:
         self.log = log or structlog.get_logger(__name__)
         self._max_subscriber_queue = max_subscriber_queue
         self._ack_timeout = ack_timeout
+        self._cohort_grace_period = cohort_grace_period
         self._now = _now
         self._groups: dict[Hashable, _SharedStreamGroup] = {}
         # Allocator for trigger-event persist-confirmation seqs; unique per
@@ -1205,6 +1234,7 @@ class SharedStreamManager:
                 on_poll_terminate=self._handle_poll_terminate,
                 max_subscriber_queue=self._max_subscriber_queue,
                 ack_timeout=self._ack_timeout,
+                cohort_grace_period=self._cohort_grace_period,
                 log=self.log,
                 _now=self._now,
             )
diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py 
b/airflow-core/tests/unit/jobs/test_triggerer_job.py
index 4ae539600d4..0bfdbc171c2 100644
--- a/airflow-core/tests/unit/jobs/test_triggerer_job.py
+++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py
@@ -991,6 +991,12 @@ class TestTriggerRunner:
         trigger_runner = TriggerRunner()
         assert trigger_runner._shared_streams._ack_timeout == 60.0
 
+    @conf_vars({("triggerer", "shared_stream_cohort_grace_period"): "3.0"})
+    def test_shared_stream_cohort_grace_period_config_wiring(self) -> None:
+        """[triggerer] shared_stream_cohort_grace_period is wired into 
SharedStreamManager."""
+        trigger_runner = TriggerRunner()
+        assert trigger_runner._shared_streams._cohort_grace_period == 3.0
+
     @pytest.mark.asyncio
     async def 
test_block_watchdog_does_not_log_when_threshold_is_not_exceeded(self) -> None:
         with conf_vars({("triggerer", 
"blocked_main_thread_warning_threshold"): "0.5"}):
diff --git a/airflow-core/tests/unit/triggers/test_shared_stream.py 
b/airflow-core/tests/unit/triggers/test_shared_stream.py
index 76baa8bff61..2c4c9bae5a1 100644
--- a/airflow-core/tests/unit/triggers/test_shared_stream.py
+++ b/airflow-core/tests/unit/triggers/test_shared_stream.py
@@ -1243,6 +1243,81 @@ async def 
test_late_subscriber_does_not_block_advance_of_earlier_event():
         await manager.unsubscribe(2, key)
 
 
[email protected]
+async def test_cohort_grace_period_includes_late_joining_subscriber():
+    """Subscribers that join during the cohort grace period are in the first 
event's snapshot."""
+    first_event_processed = asyncio.Event()
+
+    class _GraceProducer(SharedStreamProducer):
+        async def open_stream(self):
+            yield {"msg": "first"}, "p1"
+            # Fires after _poll has processed the yield and looped back for 
the next item.
+            first_event_processed.set()
+            await asyncio.Event().wait()
+
+        async def advance(self, batch):
+            pass
+
+    class _GraceTrigger(_ProgrammableSharedStreamTrigger):
+        @classmethod
+        def create_shared_stream_producer(cls, kwargs):
+            return _GraceProducer()
+
+        async def filter_shared_stream(self, shared_stream):
+            async for _ in shared_stream:
+                yield TriggerEvent({})
+
+    t1, t2 = _GraceTrigger(), _GraceTrigger()
+    key = t1.shared_stream_key()
+    manager = SharedStreamManager(cohort_grace_period=0.05)
+    try:
+        manager.subscribe(trigger_id=1, trigger=t1, key=key)
+        # Poll is sleeping through the grace period; t2 joins before polling 
starts.
+        manager.subscribe(trigger_id=2, trigger=t2, key=key)
+
+        await asyncio.wait_for(first_event_processed.wait(), timeout=1.0)
+
+        entry = next(iter(manager._groups[key]._outstanding.values()), None)
+        assert entry is not None
+        assert entry.pending == {1, 2}, "both subscribers must be in the 
initial cohort snapshot"
+    finally:
+        await manager.unsubscribe(1, key)
+        await manager.unsubscribe(2, key)
+
+
[email protected]
+async def test_cohort_grace_period_zero_starts_poll_immediately():
+    """With cohort_grace_period=0 (default), polling starts without any 
delay."""
+    poll_reached = asyncio.Event()
+
+    class _ImmediateProducer(SharedStreamProducer):
+        async def open_stream(self):
+            poll_reached.set()
+            await asyncio.Event().wait()
+            yield {}, "p"  # pragma: no cover
+
+        async def advance(self, batch):
+            pass
+
+    class _ImmediateTrigger(_ProgrammableSharedStreamTrigger):
+        @classmethod
+        def create_shared_stream_producer(cls, kwargs):
+            return _ImmediateProducer()
+
+        async def filter_shared_stream(self, shared_stream):
+            async for _ in shared_stream:
+                yield TriggerEvent({})  # pragma: no cover
+
+    t = _ImmediateTrigger()
+    key = t.shared_stream_key()
+    manager = SharedStreamManager(cohort_grace_period=0.0)
+    try:
+        manager.subscribe(trigger_id=1, trigger=t, key=key)
+        await asyncio.wait_for(poll_reached.wait(), timeout=0.5)
+    finally:
+        await manager.unsubscribe(1, key)
+
+
 @pytest.mark.asyncio
 async def test_subscriber_unsubscribe_during_outstanding_event():
     """When a subscriber leaves while still on an event, the group advances 
without it."""

Reply via email to