This is an automated email from the ASF dual-hosted git repository.
Lee-W pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 3d2a635c6ed Add shared_stream_cohort_grace_period to reduce missed
events on triggerer restart (#68888)
3d2a635c6ed is described below
commit 3d2a635c6edc7c0228b08d895a55fca947c98a7e
Author: Wei Lee <[email protected]>
AuthorDate: Wed Jun 24 13:46:11 2026 +0800
Add shared_stream_cohort_grace_period to reduce missed events on triggerer
restart (#68888)
---
.../authoring-and-scheduling/event-scheduling.rst | 12 ++++
airflow-core/newsfragments/68888.feature.rst | 1 +
.../src/airflow/config_templates/config.yml | 13 ++++
.../src/airflow/jobs/triggerer_job_runner.py | 1 +
airflow-core/src/airflow/triggers/shared_stream.py | 30 +++++++++
airflow-core/tests/unit/jobs/test_triggerer_job.py | 6 ++
.../tests/unit/triggers/test_shared_stream.py | 75 ++++++++++++++++++++++
7 files changed, 138 insertions(+)
diff --git a/airflow-core/docs/authoring-and-scheduling/event-scheduling.rst
b/airflow-core/docs/authoring-and-scheduling/event-scheduling.rst
index b60b39cf821..693632f6a75 100644
--- a/airflow-core/docs/authoring-and-scheduling/event-scheduling.rst
+++ b/airflow-core/docs/authoring-and-scheduling/event-scheduling.rst
@@ -410,6 +410,18 @@ only if the subscriber needs to run cleanup before failing.
triggerer restart, the broker redelivers messages that were never
advanced. Subscribers must therefore be idempotent.
+When multiple triggers sharing the same key restart together, the first
+to re-subscribe creates a fresh group and polling starts immediately.
+Triggers that re-subscribe later join as late subscribers (outside the
+snapshot of any already-broadcast event), so they may miss events
+committed in the window between the first subscription and their own.
+Set ``[triggerer] shared_stream_cohort_grace_period`` to a positive
+number of seconds (e.g. ``2.0``) to delay the start of polling after a
+new group is created, giving concurrent re-subscriptions time to join
+before any event is broadcast. This is a best-effort window — it reduces
+but does not eliminate the risk of a slow-rejoining trigger missing
+events.
+
**Durability**: the broker advance is gated on persistence. A subscriber's
resolution completes only after every ``TriggerEvent`` it derived from the
event has been stored in the metadata database; the confirmation reaches
diff --git a/airflow-core/newsfragments/68888.feature.rst
b/airflow-core/newsfragments/68888.feature.rst
new file mode 100644
index 00000000000..456f9907e9d
--- /dev/null
+++ b/airflow-core/newsfragments/68888.feature.rst
@@ -0,0 +1 @@
+Add ``[triggerer] shared_stream_cohort_grace_period`` to delay the start of
polling after a shared-stream group is created
diff --git a/airflow-core/src/airflow/config_templates/config.yml
b/airflow-core/src/airflow/config_templates/config.yml
index a8ce5506c53..29763f422fe 100644
--- a/airflow-core/src/airflow/config_templates/config.yml
+++ b/airflow-core/src/airflow/config_templates/config.yml
@@ -2978,6 +2978,19 @@ triggerer:
type: float
example: ~
default: "300.0"
+ shared_stream_cohort_grace_period:
+ description: |
+ Seconds to delay the start of polling after a shared-stream group is
created, giving triggers
+ that share the same key a window to subscribe before any event is
broadcast. The default of 0
+ starts polling immediately (no delay). Set to a small positive value
(e.g. 2.0–5.0) to reduce
+ the chance of triggers missing events on triggerer restart, when
multiple triggers sharing a key
+ re-subscribe concurrently and the first to arrive would otherwise
start the poll before the
+ others have joined. This is a best-effort window: triggers that take
longer than the grace period
+ to re-subscribe can still miss events committed after polling starts.
+ version_added: 3.3.0
+ type: float
+ example: ~
+ default: "0.0"
kerberos:
description: ~
options:
diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py
b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
index b32ea53a2cd..b29978e52a9 100644
--- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py
+++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
@@ -1135,6 +1135,7 @@ class TriggerRunner:
log=self.log,
max_subscriber_queue=conf.getint("triggerer",
"shared_stream_subscriber_queue_size"),
ack_timeout=conf.getfloat("triggerer",
"shared_stream_ack_timeout"),
+ cohort_grace_period=conf.getfloat("triggerer",
"shared_stream_cohort_grace_period"),
)
self.blocked_main_thread_warning_threshold = conf.getfloat(
"triggerer", "blocked_main_thread_warning_threshold"
diff --git a/airflow-core/src/airflow/triggers/shared_stream.py
b/airflow-core/src/airflow/triggers/shared_stream.py
index 93ccc3e80da..4b8ae16c8fb 100644
--- a/airflow-core/src/airflow/triggers/shared_stream.py
+++ b/airflow-core/src/airflow/triggers/shared_stream.py
@@ -110,6 +110,18 @@ when a group stops while events are still awaiting persist
confirmation
event): the pending advances are abandoned and the broker redelivers
those events.
+When multiple triggers sharing the same key restart together, the first
+to re-subscribe creates a fresh group and polling begins immediately.
+Triggers that re-subscribe later join as ordinary late subscribers (not
+counted in the snapshot of earlier events), so they may miss events
+committed during the window between the first subscription and their own.
+Set ``[triggerer] shared_stream_cohort_grace_period`` to a positive
+number of seconds to delay the start of polling after a group is created,
+giving concurrent re-subscriptions time to join before any event is
+broadcast. This is a best-effort window: triggers that take longer than
+the grace period to re-subscribe still miss events committed after
+polling starts.
+
**``shared_stream_subscriber_queue_size`` in ack mode**: the bound is
still "unprocessed raw events per subscriber". The manager does **not**
wait for outstanding resolutions before pulling the next event from the
@@ -508,6 +520,7 @@ class _SharedStreamGroup:
on_poll_terminate: Callable[[_SharedStreamGroup], None],
max_subscriber_queue: int,
ack_timeout: float,
+ cohort_grace_period: float = 0.0,
log: BoundLogger,
_now: Callable[[], float] = time.monotonic,
) -> None:
@@ -518,6 +531,7 @@ class _SharedStreamGroup:
self._on_poll_terminate = on_poll_terminate
self._max_subscriber_queue = max_subscriber_queue
self._ack_timeout = ack_timeout
+ self._cohort_grace_period = cohort_grace_period
self._now = _now
self._subscribers: dict[int, asyncio.Queue] = {}
# Subscribers already force-failed (queue overflow or ack timeout);
@@ -685,6 +699,8 @@ class _SharedStreamGroup:
producer: SharedStreamProducer | None = None
terminal_exc: BaseException | None = None
try:
+ if self._cohort_grace_period > 0:
+ await asyncio.sleep(self._cohort_grace_period)
if ack_required:
# A factory failure flows through the terminal broadcast
# path below, like any other poll failure.
@@ -1161,6 +1177,17 @@ class SharedStreamManager:
The manager is single-event-loop and not thread-safe. The triggerer's
``TriggerRunner`` is its sole owner.
+
+ :param log: Bound logger; defaults to the module logger.
+ :param max_subscriber_queue: Per-subscriber buffer size. A subscriber
whose queue is full
+ is force-failed rather than blocking the poll loop.
+ :param ack_timeout: Per-event ack timeout in seconds. A subscriber that
has not finished
+ processing an event within this window is force-failed via
:class:`AckTimeout`.
+ :param cohort_grace_period: Seconds to delay the start of polling after a
new group is
+ created. When > 0, the poll loop sleeps for this duration before
calling
+ ``open_stream``/``open_shared_stream``, giving triggers that share the
same key a window
+ to subscribe before any event is broadcast — useful on triggerer
restart where concurrent
+ re-subscriptions would otherwise race against the first poll. Default
0 (no delay).
"""
def __init__(
@@ -1169,11 +1196,13 @@ class SharedStreamManager:
log: BoundLogger | None = None,
max_subscriber_queue: int = DEFAULT_SUBSCRIBER_QUEUE_MAX,
ack_timeout: float = DEFAULT_ACK_TIMEOUT,
+ cohort_grace_period: float = 0.0,
_now: Callable[[], float] = time.monotonic,
) -> None:
self.log = log or structlog.get_logger(__name__)
self._max_subscriber_queue = max_subscriber_queue
self._ack_timeout = ack_timeout
+ self._cohort_grace_period = cohort_grace_period
self._now = _now
self._groups: dict[Hashable, _SharedStreamGroup] = {}
# Allocator for trigger-event persist-confirmation seqs; unique per
@@ -1205,6 +1234,7 @@ class SharedStreamManager:
on_poll_terminate=self._handle_poll_terminate,
max_subscriber_queue=self._max_subscriber_queue,
ack_timeout=self._ack_timeout,
+ cohort_grace_period=self._cohort_grace_period,
log=self.log,
_now=self._now,
)
diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py
b/airflow-core/tests/unit/jobs/test_triggerer_job.py
index 4ae539600d4..0bfdbc171c2 100644
--- a/airflow-core/tests/unit/jobs/test_triggerer_job.py
+++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py
@@ -991,6 +991,12 @@ class TestTriggerRunner:
trigger_runner = TriggerRunner()
assert trigger_runner._shared_streams._ack_timeout == 60.0
+ @conf_vars({("triggerer", "shared_stream_cohort_grace_period"): "3.0"})
+ def test_shared_stream_cohort_grace_period_config_wiring(self) -> None:
+ """[triggerer] shared_stream_cohort_grace_period is wired into
SharedStreamManager."""
+ trigger_runner = TriggerRunner()
+ assert trigger_runner._shared_streams._cohort_grace_period == 3.0
+
@pytest.mark.asyncio
async def
test_block_watchdog_does_not_log_when_threshold_is_not_exceeded(self) -> None:
with conf_vars({("triggerer",
"blocked_main_thread_warning_threshold"): "0.5"}):
diff --git a/airflow-core/tests/unit/triggers/test_shared_stream.py
b/airflow-core/tests/unit/triggers/test_shared_stream.py
index 76baa8bff61..2c4c9bae5a1 100644
--- a/airflow-core/tests/unit/triggers/test_shared_stream.py
+++ b/airflow-core/tests/unit/triggers/test_shared_stream.py
@@ -1243,6 +1243,81 @@ async def
test_late_subscriber_does_not_block_advance_of_earlier_event():
await manager.unsubscribe(2, key)
[email protected]
+async def test_cohort_grace_period_includes_late_joining_subscriber():
+ """Subscribers that join during the cohort grace period are in the first
event's snapshot."""
+ first_event_processed = asyncio.Event()
+
+ class _GraceProducer(SharedStreamProducer):
+ async def open_stream(self):
+ yield {"msg": "first"}, "p1"
+ # Fires after _poll has processed the yield and looped back for
the next item.
+ first_event_processed.set()
+ await asyncio.Event().wait()
+
+ async def advance(self, batch):
+ pass
+
+ class _GraceTrigger(_ProgrammableSharedStreamTrigger):
+ @classmethod
+ def create_shared_stream_producer(cls, kwargs):
+ return _GraceProducer()
+
+ async def filter_shared_stream(self, shared_stream):
+ async for _ in shared_stream:
+ yield TriggerEvent({})
+
+ t1, t2 = _GraceTrigger(), _GraceTrigger()
+ key = t1.shared_stream_key()
+ manager = SharedStreamManager(cohort_grace_period=0.05)
+ try:
+ manager.subscribe(trigger_id=1, trigger=t1, key=key)
+ # Poll is sleeping through the grace period; t2 joins before polling
starts.
+ manager.subscribe(trigger_id=2, trigger=t2, key=key)
+
+ await asyncio.wait_for(first_event_processed.wait(), timeout=1.0)
+
+ entry = next(iter(manager._groups[key]._outstanding.values()), None)
+ assert entry is not None
+ assert entry.pending == {1, 2}, "both subscribers must be in the
initial cohort snapshot"
+ finally:
+ await manager.unsubscribe(1, key)
+ await manager.unsubscribe(2, key)
+
+
[email protected]
+async def test_cohort_grace_period_zero_starts_poll_immediately():
+ """With cohort_grace_period=0 (default), polling starts without any
delay."""
+ poll_reached = asyncio.Event()
+
+ class _ImmediateProducer(SharedStreamProducer):
+ async def open_stream(self):
+ poll_reached.set()
+ await asyncio.Event().wait()
+ yield {}, "p" # pragma: no cover
+
+ async def advance(self, batch):
+ pass
+
+ class _ImmediateTrigger(_ProgrammableSharedStreamTrigger):
+ @classmethod
+ def create_shared_stream_producer(cls, kwargs):
+ return _ImmediateProducer()
+
+ async def filter_shared_stream(self, shared_stream):
+ async for _ in shared_stream:
+ yield TriggerEvent({}) # pragma: no cover
+
+ t = _ImmediateTrigger()
+ key = t.shared_stream_key()
+ manager = SharedStreamManager(cohort_grace_period=0.0)
+ try:
+ manager.subscribe(trigger_id=1, trigger=t, key=key)
+ await asyncio.wait_for(poll_reached.wait(), timeout=0.5)
+ finally:
+ await manager.unsubscribe(1, key)
+
+
@pytest.mark.asyncio
async def test_subscriber_unsubscribe_during_outstanding_event():
"""When a subscriber leaves while still on an event, the group advances
without it."""