jason810496 commented on code in PR #67523:
URL: https://github.com/apache/airflow/pull/67523#discussion_r3411679546
##########
airflow-core/tests/unit/jobs/test_triggerer_job.py:
##########
@@ -901,12 +910,107 @@ async def _drive():
events = list(trigger_runner.events)
assert len(events) == 1
- trigger_id, event = events[0]
+ trigger_id, event, _seq = events[0]
assert trigger_id == 1
assert event.payload == {"region": "us"}
# Group is torn down on unsubscribe.
assert trigger_runner._shared_streams._groups == {}
+ def test_shared_stream_ack_mode_integration(self, session) -> None:
Review Comment:
```suggestion
def test_shared_stream_ack_mode_integration(self) -> None:
```
##########
airflow-core/src/airflow/triggers/shared_stream.py:
##########
@@ -220,20 +732,280 @@ async def _poll(self) -> None:
# Drain stale events then put the failure sentinel so every
# subscriber wakes up even if its queue was at capacity.
self._drain_and_offer_failure(queue, failure)
+ # End of synchronous section; yields are safe from here on.
+ if cancelled_ack_task is not None:
+ with suppress(asyncio.CancelledError):
+ await cancelled_ack_task
+ if self._pump_task is not None:
+ # The pump exits by itself once it has dispatched everything
+ # already resolved and dispatchable; the suppress is defensive.
+ with suppress(asyncio.CancelledError):
+ await self._pump_task
+ if producer is not None:
+ try:
+ await producer.aclose()
+ except Exception as exc:
+ self.log.warning("Producer aclose failed", key=self.key,
exc_info=exc)
+
+ def _resolve_subscriber(
+ self,
+ *,
+ event_id: int,
+ trigger_id: int,
+ resolution: Literal["acked", "failed", "rejected"],
+ ) -> None:
+ """
+ Record one subscriber's resolution of one event.
+
+ Removes ``trigger_id`` from the pending set of ``event_id`` and adds
+ it to the matching outcome count. If the set empties, wakes the
+ advance pump — deleting the entry and dispatching the broker advance
+ are the pump's job, so advances stay in per-lane fan-out order.
Duplicate
+ calls for the same (event_id, trigger_id) are no-ops, which keeps
+ every subscriber counted exactly once per event.
+
+ This is the single cleanup point for the subscriber's resolution
+ bookkeeping: the binding is popped and its unconfirmed seqs leave
+ ``_seq_index``, so a persist confirmation arriving after the fact
+ (e.g. after an ack timeout already failed the subscriber) finds
+ nothing and no-ops.
+ """
+ entry = self._outstanding.get(event_id)
+ if entry is None or trigger_id not in entry.pending:
+ return # already resolved, already advanced, or never existed
+ entry.pending.discard(trigger_id)
+ binding = entry.bindings.pop(trigger_id, None)
+ if binding is not None:
+ for seq in binding.unconfirmed_seqs:
+ self._seq_index.pop(seq, None)
+ if resolution == "acked":
+ entry.acked += 1
+ elif resolution == "rejected":
+ entry.rejected += 1
+ else:
+ entry.failed += 1
+ if not entry.pending:
+ self._advance_wakeup.set()
+
+ def _reject_pending_event(self, *, trigger_id: int, event_id: int) -> None:
+ """
+ Resolve the subscriber's current open-window event as rejected.
+
+ Called by :func:`reject_shared_stream_event` while the subscriber's
+ binding window for ``event_id`` is open. Unlike a normal acceptance,
+ a reject resolves immediately — there is no derived trigger event, so
+ nothing has to be persisted first and the persist gate is skipped.
+
+ A subscriber that rejects an event and then still yields a real
+ trigger event for it is a subscriber-side logic error: the reject has
+ already popped the binding, so :meth:`bind_pending_event` returns
+ ``None`` and the late event simply does not bind to this raw event's
+ ack accounting (it still fires). The framework absorbs this safely
+ rather than raising.
+ """
+ self._resolve_subscriber(event_id=event_id, trigger_id=trigger_id,
resolution="rejected")
+
+ def _maybe_complete(self, *, event_id: int, trigger_id: int) -> None:
+ """Resolve the subscriber as acked once the window is closed and all
persist confirmations are in."""
+ entry = self._outstanding.get(event_id)
+ if entry is None:
+ return
+ binding = entry.bindings.get(trigger_id)
+ if binding is None:
+ return
+ if binding.window_closed and not binding.unconfirmed_seqs:
+ self._resolve_subscriber(event_id=event_id, trigger_id=trigger_id,
resolution="acked")
+
+ def _close_binding_window(self, *, event_id: int, trigger_id: int) -> None:
+ """
+ Close the subscriber's binding window for one event.
+
+ Called by ``_ack_drain`` the moment the subscriber pulls the next
+ item — trigger events the subscriber emits from here on belong to
+ the next event, and this one can now resolve for the subscriber
+ (subject to persist confirmations).
+ """
+ if self._open_windows.get(trigger_id) == event_id:
+ del self._open_windows[trigger_id]
+ entry = self._outstanding.get(event_id)
+ if entry is None:
+ return
+ binding = entry.bindings.get(trigger_id)
+ if binding is None:
+ return
+ binding.window_closed = True
+ self._maybe_complete(event_id=event_id, trigger_id=trigger_id)
+
+ def bind_pending_event(self, *, trigger_id: int, seq_counter:
Iterator[int]) -> int | None:
+ """
+ Bind one just-emitted trigger event to the subscriber's open binding
window.
+
+ Returns the seq the broker advance must wait on, or ``None`` when
+ there is nothing to bind to — no window open (not in ack mode, or
+ the event was emitted outside any raw event's window) or the
+ binding already resolved (force-failed). A seq is only drawn from
+ ``seq_counter`` when the event actually binds.
+ """
+ event_id = self._open_windows.get(trigger_id)
+ if event_id is None:
+ return None
+ entry = self._outstanding.get(event_id)
+ if entry is None:
+ return None
+ binding = entry.bindings.get(trigger_id)
+ if binding is None:
+ return None
+ seq = next(seq_counter)
+ binding.unconfirmed_seqs.add(seq)
+ self._seq_index[seq] = (event_id, trigger_id)
+ return seq
+
+ def confirm_persisted(self, seqs: Iterable[int]) -> None:
+ """
+ Record persist confirmations for trigger event seqs bound in this
group.
+
+ Seqs that are not (or no longer) in ``_seq_index`` — another group's
+ seqs, or bindings that already resolved through timeout / overflow —
+ are ignored.
+ """
+ for seq in seqs:
+ bound = self._seq_index.pop(seq, None)
+ if bound is None:
+ continue
+ event_id, trigger_id = bound
+ entry = self._outstanding.get(event_id)
+ if entry is None:
+ continue
+ binding = entry.bindings.get(trigger_id)
+ if binding is None:
+ continue
+ binding.unconfirmed_seqs.discard(seq)
+ self._maybe_complete(event_id=event_id, trigger_id=trigger_id)
+
+ def _fail_subscriber_in_outstanding(self, trigger_id: int) -> None:
+ """
+ Resolve ``trigger_id`` as failed in every outstanding entry.
+
+ A force-failed subscriber (queue overflow or ack timeout) will never
+ resolve the events it still owes; leaving it pending in older entries
+ would head-block the ordered advance pump until each entry's own ack
+ timeout. This is the retroactive counterpart of excluding the
+ subscriber from future broadcasts via ``_failed_subscribers``.
+ """
+ for event_id in list(self._outstanding):
+ self._resolve_subscriber(event_id=event_id, trigger_id=trigger_id,
resolution="failed")
+
+ async def _run_ack_timeout_loop(self) -> None:
+ """
+ Background task: force-fail subscribers whose resolution is overdue.
+
+ Cadence is ``max(0.01, ack_timeout / 10)`` — runs ten times per timeout
+ window, floored at 10 ms to avoid burning CPU when ``ack_timeout`` is
small.
+ """
+ cadence = max(0.01, self._ack_timeout / 10)
+ while True:
+ await asyncio.sleep(cadence)
+ now = self._now()
+ for event_id, entry in list(self._outstanding.items()):
+ if now - entry.created_at < self._ack_timeout:
+ continue
+ timed_out = set(entry.pending)
+ for trigger_id in timed_out:
+ queue = self._subscribers.get(trigger_id)
+ if queue is not None:
+ self.log.warning(
+ "Ack timeout; force-failing subscriber",
+ key=self.key,
+ trigger_id=trigger_id,
+ event_id=event_id,
+ )
+ self._drain_and_offer_failure(
+ queue,
+ _PollFailure(
+ AckTimeout(
+ f"shared stream {self.key!r} trigger
{trigger_id} "
+ f"did not finish processing event
{event_id} within "
+ f"{self._ack_timeout}s (still on the
event, or its "
+ "trigger events not yet confirmed
persisted)"
+ )
+ ),
+ )
+ self._failed_subscribers.add(trigger_id)
+ # Fail the subscriber out of every outstanding entry (not
+ # just this one); the resolution wakes the pump, which
+ # owns entry deletion and the ordered broker advance.
+ self._fail_subscriber_in_outstanding(trigger_id)
+
+ async def _ack_drain(self, trigger_id: int, queue: asyncio.Queue) ->
AsyncGenerator[Any, None]:
+ """
+ Ack-mode counterpart of :func:`_drain`, tracking the binding window.
+
+ Unwraps the internal ``(raw_event, event_id)`` queue items and
+ yields the bare raw event, so the subscriber sees the same stream
+ shape as on the fast path. Between the yield and the subscriber
+ pulling the next item, the event's binding window is open: trigger
+ events the subscriber emits bind to it via
+ :meth:`bind_pending_event`. The window closes the moment the
+ subscriber resumes this generator — before we wait on the queue
+ again — so a filtered-out event (nothing yielded) resolves as soon
+ as the filter loops back for the next raw event. Closing the window
+ is the subscriber's acceptance of the event; no explicit call is
+ involved.
+ """
+ while True:
+ item = await queue.get()
+ if isinstance(item, _PollFailure):
+ raise item.exc
+ raw_event, event_id = item
+ self._open_windows[trigger_id] = event_id
+ # Expose the open window to reject_shared_stream_event(), which the
+ # subscriber's filter may call instead of yielding. Clear it in
+ # ``finally`` so cancellation / exceptions also clear it and no
+ # window leaks across raw events. We assign ``None`` rather than
+ # ``reset(token)`` on purpose: each ``__anext__`` of this generator
+ # runs in its caller's context, which may be a different asyncio
+ # task than the one that set the value (the filter is resumed from
+ # a fresh task in some flows), and ``ContextVar.reset`` rejects a
+ # token created in another context. Windows never nest, so clearing
+ # to ``None`` is the correct restore.
+ _reject_context.set(_RejectContext(group=self,
trigger_id=trigger_id, event_id=event_id))
+ try:
+ yield raw_event
+ finally:
+ _reject_context.set(None)
+ self._close_binding_window(event_id=event_id,
trigger_id=trigger_id)
def subscribe(self, trigger_id: int) -> AsyncIterator[Any]:
"""Register ``trigger_id`` as a subscriber and return its raw event
stream."""
if trigger_id in self._subscribers:
raise RuntimeError(f"Trigger {trigger_id} already subscribed to
shared stream {self.key!r}")
queue: asyncio.Queue =
asyncio.Queue(maxsize=self._max_subscriber_queue)
self._subscribers[trigger_id] = queue
+ if self._ack_required:
+ return self._ack_drain(trigger_id, queue)
return _drain(queue)
def unsubscribe(self, trigger_id: int) -> None:
# Active subscribers exit through their consuming task being cancelled
# (Airflow's standard idiom); dropping the queue is enough here.
self._subscribers.pop(trigger_id, None)
- self._overflowed.discard(trigger_id)
+ self._failed_subscribers.discard(trigger_id)
+ self._open_windows.pop(trigger_id, None)
+ # Implicit resolution: leaving closes the subscriber's window on
+ # every outstanding event, so the producer never waits forever for a
+ # subscriber that has left. The broker advance still waits for
+ # persist confirmation of any trigger events this subscriber derived
+ # from the event; with nothing unconfirmed the subscriber resolves
+ # immediately.
+ for event_id, entry in list(self._outstanding.items()):
+ binding = entry.bindings.get(trigger_id)
+ if binding is None:
+ # No live binding (already resolved, or pre-ack-mode entry);
+ # _resolve_subscriber no-ops unless still pending.
+ self._resolve_subscriber(event_id=event_id,
trigger_id=trigger_id, resolution="acked")
+ continue
+ binding.window_closed = True
+ self._maybe_complete(event_id=event_id, trigger_id=trigger_id)
Review Comment:
On graceful triggerer shutdown, user cancellation, or trigger reassignment,
this can commit/delete broker messages with no persisted TriggerEvent.
Recommendation: Do not treat all unsubscribe paths as clean acceptance.
Track whether the subscriber actually consumed/moved past each event, and
resolve unseen or in-flight events as failed/redeliverable on
cancellation/shutdown; add tests for ack-mode shutdown with queued but
unprocessed events.
##########
airflow-core/src/airflow/triggers/shared_stream.py:
##########
@@ -136,6 +295,177 @@ def __init__(self, exc: BaseException) -> None:
self.exc = exc
+class AckTimeout(Exception):
+ """
+ Raised in a subscriber that did not finish processing an event within the
per-event timeout.
+
+ The subscriber is either still on the event, or some of its derived
+ trigger events were never confirmed persisted. Treated the same as
+ :class:`_SubscriberOverflow` — the subscriber's trigger fails through
+ the standard trigger-failure path. Other subscribers in the same group
+ are unaffected; once they resolve, the producer advances normally.
+ """
+
+
+@dataclass(frozen=True, slots=True)
+class AdvanceOutcome:
+ """
+ Per-event resolution counts handed to :meth:`SharedStreamProducer.advance`.
+
+ Every subscriber that was online when the event was broadcast is counted
+ in exactly one field:
+
+ * ``acked`` — the subscriber moved past the event (pulled the next raw
+ event, or unsubscribed while the event was outstanding) and every
+ trigger event it derived from the event was confirmed persisted.
+ * ``failed`` — force-failed by the manager (ack timeout — including a
+ persist confirmation that never arrived — or queue overflow).
+ * ``rejected`` — the subscriber actively refused the event by calling
+ :func:`reject_shared_stream_event` from its filter. Terminal: the
+ broker should dead-letter / ``nack`` such events rather than redeliver
+ them, which is what distinguishes a reject from an involuntary
+ ``failed`` (where redelivery is the right response).
+
+ A producer reconciles these per-broker in
:meth:`SharedStreamProducer.advance`
+ — the framework only reports the counts. For example a Service Bus producer
+ dead-letters when ``rejected`` is non-zero, abandons (redelivers) when only
+ ``failed`` is non-zero, and completes when every subscriber accepted the
+ event; a Pub/Sub producer ``nack`` s on a reject and ``ack`` s otherwise.
+
+ An event broadcast while no subscribers were online carries all-zero
+ counts and is clean.
+ """
+
+ acked: int
+ failed: int
+ rejected: int = 0
+
+ @property
+ def is_clean(self) -> bool:
+ """Whether every subscriber accepted the event — no active reject and
no involuntary failure."""
+ return self.failed == 0 and self.rejected == 0
Review Comment:
I'm not sure will there be the case of zero subscriber. If yes, then we
should add the condition of `self.acked > 0`, otherwise we will delete the msg
even with zero subscriber.
```suggestion
return self.failed == 0 and self.rejected == 0 and self.acked > 0
```
##########
airflow-core/src/airflow/triggers/shared_stream.py:
##########
@@ -80,19 +156,92 @@
from __future__ import annotations
import asyncio
-from collections.abc import AsyncGenerator, AsyncIterator, Callable, Hashable
+import itertools
+import time
+from abc import ABC, abstractmethod
+from collections import deque
+from collections.abc import AsyncGenerator, AsyncIterator, Callable, Hashable,
Iterable, Iterator, Sequence
from contextlib import suppress
-from typing import TYPE_CHECKING, Any
+from contextvars import ContextVar
+from dataclasses import dataclass, field
+from typing import TYPE_CHECKING, Any, Literal, NamedTuple
import structlog
+from airflow.triggers.base import BaseEventTrigger
+
if TYPE_CHECKING:
from structlog.stdlib import BoundLogger
- from airflow.triggers.base import BaseEventTrigger
-
log = structlog.get_logger(__name__)
+__all__ = [
+ "AckTimeout",
+ "AdvanceItem",
+ "AdvanceOutcome",
+ "SharedStreamManager",
+ "SharedStreamProducer",
+ "reject_shared_stream_event",
+]
+
+
+class _RejectContext(NamedTuple):
+ """
+ The open binding window in scope while a subscriber's filter runs in ack
mode.
+
+ Set by :meth:`_SharedStreamGroup._ack_drain` for the duration of one raw
+ event's yield and read by :func:`reject_shared_stream_event`.
+ """
+
+ group: _SharedStreamGroup
+ trigger_id: int
+ event_id: int
+
+
+_reject_context: ContextVar[_RejectContext | None] =
ContextVar("shared_stream_reject_context", default=None)
+"""Task-local handle to the binding window of the raw event currently being
filtered.
+
+Only set while a subscriber's
:meth:`~airflow.triggers.base.BaseEventTrigger.filter_shared_stream`
+is processing a raw event in ack mode; ``None`` otherwise (fast path,
standalone
+``run()``, or between raw events). See :func:`reject_shared_stream_event`.
+"""
+
+
+def reject_shared_stream_event() -> None:
+ """
+ Reject the shared-stream raw event the calling filter is currently
processing.
+
+ Call this from inside
+ :meth:`~airflow.triggers.base.BaseEventTrigger.filter_shared_stream` when,
+ instead of yielding a :class:`~airflow.triggers.base.TriggerEvent`, you
+ want the broker to treat the raw event as terminally refused — dead-letter
+ it (Azure Service Bus), ``nack`` it (Pub/Sub), and so on. A reject is
+ distinct from an involuntary failure: it counts toward
+ :attr:`AdvanceOutcome.rejected` rather than ``failed``, so a producer can
+ dead-letter rejects while still redelivering failures.
+
+ The reject resolves the event immediately; there is no derived trigger
+ event to persist first.
+
+ Only meaningful while a raw event's binding window is open — that is,
+ inside ``filter_shared_stream`` of an ack-mode stream, right after the
+ filter received a raw event. Called anywhere else (the fast path,
+ standalone ``run()``, or between two raw events) it logs a warning and is a
+ no-op, because there is no broker advance to influence.
+
+ Invariant: this relies on the filter running in the same asyncio task as
+ the one driving the raw-event binding window (a task-local context variable
+ carries the open window). Driving the filter iteration from a different
task
+ (for example via :func:`asyncio.to_thread` or a freshly created task) would
+ make the open window invisible here and turn every reject into a no-op.
+ """
+ ctx = _reject_context.get()
+ if ctx is None:
+ log.warning("reject_shared_stream_event called outside an ack-mode
binding window; ignored")
+ return
+ ctx.group._reject_pending_event(trigger_id=ctx.trigger_id,
event_id=ctx.event_id)
+
Review Comment:
How about adding `log.info` to the success case for the further
observability?
##########
airflow-core/docs/authoring-and-scheduling/event-scheduling.rst:
##########
@@ -150,24 +150,267 @@ In other words, the savings is at the poll-loop and
upstream-I/O layer, not at t
Suitable upstreams
^^^^^^^^^^^^^^^^^^
-The shared-stream channel is **one-way** today: events flow from
-``open_shared_stream`` out to each subscriber's ``filter_shared_stream``,
-and there is no way for a subscriber to tell the producer "I accepted /
-dropped / committed this event". That restricts the pattern to upstreams
-whose consumption does **not** depend on a side effect on a handle that
-only the producer holds. Good fits:
+Good fits for the shared-stream pattern:
* Idempotent / read-only reads — directory scans, polling REST APIs.
* Subscriber-side-effect cleanup, where the trigger's per-event action
(``unlink``, local marking, …) goes through APIs the subscriber owns
independently of the shared producer handle.
+* Message-broker upstreams (Kafka, SQS, Pub/Sub, Azure Service Bus) where
+ the producer must commit/delete/ack after all subscribers have processed
+ the message — use the ack channel described below.
+
+Producer-side ack channel
+^^^^^^^^^^^^^^^^^^^^^^^^^
+
+For upstreams where the producer must advance (commit, delete, or ack) only
+after all subscribers have processed an event, override
+:meth:`~airflow.triggers.base.BaseEventTrigger.create_shared_stream_producer`
+to return a :class:`~airflow.triggers.shared_stream.SharedStreamProducer`.
+When this factory is overridden, the manager enters **ack mode**. The
+subscriber side does not change: ``filter_shared_stream`` receives raw
+events exactly as on the fast path, so the same filter code works in both
+modes — the framework infers when the broker may advance from each
+subscriber's consumption progress and from the persistence of the trigger
+events it derived.
+
+1. The manager calls ``create_shared_stream_producer(kwargs)`` once per
+ shared-stream group. The returned producer owns the broker connection for
+ the lifetime of one poll; open the connection lazily inside
+ ``open_stream``, not in the factory.
+2. The producer's ``open_stream`` yields ``(event, broker_payload)`` tuples,
+ where ``broker_payload`` is whatever the producer needs later (e.g. an
+ SQS receipt handle, a Kafka offset, a Pub/Sub ack ID).
+3. Each subscriber's ``filter_shared_stream`` receives raw events exactly
+ as in the fast path. A subscriber resolves an event once it has moved
+ past it (pulled the next raw event, or unsubscribed) and every
+ ``TriggerEvent`` it derived from the event has been persisted to the
+ metadata database.
+4. Once every subscriber in the fan-out set has resolved an event (by
+ moving past it, unsubscribing, timing out, or overflowing its queue),
+ the manager calls ``await producer.advance(batch)`` with the contiguous
+ prefix of fully resolved events in the event's lane — commit the
+ offsets, delete the SQS messages, etc. Each item in the batch is an
+ :class:`~airflow.triggers.shared_stream.AdvanceItem` carrying the
+ event's ``broker_payload`` and an
+ :class:`~airflow.triggers.shared_stream.AdvanceOutcome` with per-event
+ counts of how the subscribers resolved.
+
+**Rejecting an event**: a subscriber's filter can actively refuse a raw
+event instead of yielding a trigger event from it, by calling
+:func:`~airflow.triggers.shared_stream.reject_shared_stream_event` while
+processing that event. This is distinct from an involuntary failure. A
+``failed`` count means a subscriber did not finish in time (ack timeout) or
+fell behind (queue overflow) — the right response is usually redelivery. A
+``rejected`` count means a subscriber decided the event must not produce a
+trigger event and should be terminally discarded — the right response is to
+dead-letter or ``nack`` it, not redeliver. The
+:class:`~airflow.triggers.shared_stream.AdvanceOutcome` reports both
+counts separately so the producer can apply the right per-broker action in
+``advance``:
+
+* Azure Service Bus: dead-letter the message when ``rejected`` is non-zero,
+ abandon it (so the broker redelivers) when only ``failed`` is non-zero,
+ and complete it when every subscriber accepted the event.
+* Pub/Sub: ``nack`` the message on a reject, otherwise ``ack`` it.
+
+The framework only reports the counts — it never dead-letters, ``nack`` s,
+or redelivers on its own; that broker-specific decision lives entirely in
+the producer's ``advance``.
+
+``reject_shared_stream_event`` is meaningful only while the filter is
+processing a raw event in ack mode (the binding window of that event is
+open). Called on the fast path, from a standalone ``run()``, or between two
+raw events, it logs a warning and does nothing, because there is no broker
+advance to influence. Because it resolves the event immediately, there is
+nothing to persist and the reject does not wait on the persistence gate.
+
+``is_clean`` is ``True`` only when an event has no rejects and no failures —
+every subscriber that was online at broadcast accepted it. A single reject
+or a single failure makes ``is_clean`` ``False``.
+
+Example — reject inside a filter:
-Currently **not** in scope: Kafka consumers (regardless of commit mode),
-SQS with delete-on-process or visibility extension, and any source where
-progress on the producer's handle is tied to the subscriber's accept /
-reject decision. These sources need a way for the subscriber to signal
-acceptance back to the producer, which the current shared-stream API does
-not provide.
+.. code-block:: python
+
+ from airflow.triggers.shared_stream import reject_shared_stream_event
+
+
+ async def filter_shared_stream(self, shared_stream):
+ async for raw in shared_stream:
+ if raw.get("malformed"):
+ # Never produce a trigger event from this; have the broker
+ # dead-letter it rather than redeliver it.
+ reject_shared_stream_event()
+ continue
+ yield TriggerEvent(raw)
+
+**Ordering guarantee**: by default every event belongs to the same lane.
+The items of a batch are in event order and form their lane's contiguous
+resolved prefix; within a lane, batches arrive strictly in order — the
+next ``advance`` is awaited only after the previous call returned (or
+failed and was logged). A producer can override ``get_advance_lane`` to
+narrow that ordering to within a lane: events whose lane values compare
+equal are batched and advanced in event order relative to each other,
+while events in different lanes do not wait for one another. Either way,
+at most one ``advance`` call is awaited at a time, and cumulative schemes
+such as a Kafka offset commit only need to commit the batch's last item.
+When the poll ends, the manager calls ``await producer.aclose()`` once,
+best-effort.
+
+Example — SQS-like producer:
+
+.. code-block:: python
+
+ from collections.abc import AsyncIterator, Sequence
+ from typing import Any
+
+ from airflow.triggers.base import BaseEventTrigger, TriggerEvent
+ from airflow.triggers.shared_stream import AdvanceItem,
SharedStreamProducer
+
+
+ class SqsSharedStreamProducer(SharedStreamProducer):
+ def __init__(self, queue_url: str):
+ self.queue_url = queue_url
+ self.client = None
+
+ async def open_stream(self) -> AsyncIterator[tuple[Any, Any]]:
+ # Open the connection here, not in the trigger's factory.
+ self.client = await create_sqs_client()
+ while True:
+ messages = await poll_sqs(self.client, self.queue_url)
+ for msg in messages:
+ yield msg["Body"], msg["ReceiptHandle"]
+
+ async def advance(self, batch: Sequence[AdvanceItem]) -> None:
+ # Called with one lane's batch of fully resolved messages.
+ for receipt_handle, outcome in batch:
+ if outcome.is_clean:
+ await delete_sqs_message(self.client, self.queue_url,
receipt_handle)
+ # Otherwise leave the message for the visibility timeout to
redeliver.
+
+ async def aclose(self) -> None:
+ if self.client is not None:
+ await self.client.close()
+
+
+ class SqsSharedTrigger(BaseEventTrigger):
+ def __init__(self, *, queue_url: str, region: str | None = None):
+ super().__init__()
+ self.queue_url = queue_url
+ self.region = region
+
+ def serialize(self):
+ return (
+ f"{type(self).__module__}.{type(self).__qualname__}",
+ {"queue_url": self.queue_url, "region": self.region},
+ )
+
+ def shared_stream_key(self):
+ return ("sqs", self.queue_url)
+
+ @classmethod
+ def create_shared_stream_producer(cls, kwargs) ->
SqsSharedStreamProducer:
+ return SqsSharedStreamProducer(kwargs["queue_url"])
+
+ async def filter_shared_stream(self, shared_stream):
+ async for raw in shared_stream:
+ if self.region is None or raw.get("region") == self.region:
+ yield TriggerEvent(raw)
+
+ async def run(self):
+ yield TriggerEvent({})
+
+Example — Kafka cumulative commit across partitions. A Kafka commit
+acknowledges every offset up to the committed one within a partition, so
+it is only safe if no later event from the same partition can be committed
+while an earlier one is still pending — events on other partitions do not
+matter. Returning ``(topic, partition)`` from ``get_advance_lane`` narrows
+the ordering guarantee to exactly that granularity: each partition's
+commits stay in order, and a slow partition no longer delays commits on
+the other partitions:
+
+.. code-block:: python
+
+ class KafkaSharedStreamProducer(SharedStreamProducer):
+ def __init__(self, topics: list[str]):
+ self.topics = topics
+ self.consumer = None
+
+ async def open_stream(self):
+ # Auto-commit must be off (the Kafka default is on), or the
+ # consumer commits on its own schedule and the ack channel
+ # no longer controls what the broker considers delivered.
+ self.consumer = await create_kafka_consumer(self.topics,
enable_auto_commit=False)
+ async for message in self.consumer:
+ yield message.value, (message.topic, message.partition,
message.offset)
+
+ def get_advance_lane(self, broker_payload):
+ topic, partition, _offset = broker_payload
+ return topic, partition
+
+ async def advance(self, batch):
+ # The batch is one lane's — here, one partition's — contiguous
+ # resolved prefix, in event order, so committing the offset of
+ # its last item covers the whole batch and can never skip past
+ # an event that is still pending.
+ topic, partition, offset = batch[-1].broker_payload
+ await self.consumer.commit(topic, partition, offset + 1)
Review Comment:
IIUC, we need to respect the `outcome` (`AdvanceItem`) instead of directly
commit the offset.
For example:
```python
to_dlq = []
for broker_payload, outcome in batch:
if not outcome.is_clean:
to_dlq.append(broker_payload)
handle_dlq(to_dlq)
topic, partition, offset = batch[-1].broker_payload
await self.consumer.commit(topic, partition, offset + 1)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]