jason810496 commented on code in PR #67523:
URL: https://github.com/apache/airflow/pull/67523#discussion_r3404281741
##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -1417,6 +1447,8 @@ async def sync_state_to_supervisor(self, finished_ids:
list[int]) -> None:
if resp:
self.to_create.extend(resp.to_create)
self.to_cancel.extend(resp.to_cancel)
+ if resp.events_persisted:
+ self._shared_streams.confirm_persisted(resp.events_persisted)
Review Comment:
From my perspective, the at-least-once semantic that we need to achieve
already ensure at this point.
We can safely execute the `advance` after the events were persisted.
##########
airflow-core/src/airflow/triggers/shared_stream.py:
##########
@@ -220,20 +674,264 @@ async def _poll(self) -> None:
# Drain stale events then put the failure sentinel so every
# subscriber wakes up even if its queue was at capacity.
self._drain_and_offer_failure(queue, failure)
+ # End of synchronous section; yields are safe from here on.
+ if cancelled_ack_task is not None:
+ with suppress(asyncio.CancelledError):
+ await cancelled_ack_task
+ if self._pump_task is not None:
+ # The pump exits by itself once it has dispatched everything
+ # already resolved and dispatchable; the suppress is defensive.
+ with suppress(asyncio.CancelledError):
+ await self._pump_task
+ if producer is not None:
+ try:
+ await producer.aclose()
+ except Exception as exc:
+ self.log.warning("Producer aclose failed", key=self.key,
exc_info=exc)
+
+ def _resolve_subscriber(
+ self,
+ *,
+ event_id: int,
+ trigger_id: int,
+ resolution: Literal["acked", "nacked", "failed"],
+ ) -> None:
+ """
+ Record one subscriber's resolution of one event.
+
+ Removes ``trigger_id`` from the pending set of ``event_id`` and adds
+ it to the matching outcome count. If the set empties, wakes the
+ advance pump — deleting the entry and dispatching the broker advance
+ are the pump's job, so advances stay in per-lane fan-out order.
Duplicate
+ calls for the same (event_id, trigger_id) are no-ops, which keeps
+ every subscriber counted exactly once per event.
+
+ This is the single cleanup point for the subscriber's ack
+ bookkeeping: the binding is popped and its unconfirmed seqs leave
+ ``_seq_index``, so a persist confirmation arriving after the fact
+ (e.g. after an ack timeout already failed the subscriber) finds
+ nothing and no-ops.
+ """
+ entry = self._outstanding.get(event_id)
+ if entry is None or trigger_id not in entry.pending:
+ return # already resolved, already advanced, or never existed
+ entry.pending.discard(trigger_id)
+ binding = entry.bindings.pop(trigger_id, None)
+ if binding is not None:
+ for seq in binding.unconfirmed_seqs:
+ self._seq_index.pop(seq, None)
+ if resolution == "acked":
+ entry.acked += 1
+ elif resolution == "nacked":
+ entry.nacked += 1
+ else:
+ entry.failed += 1
+ if not entry.pending:
+ self._advance_wakeup.set()
+
+ def _record_ack(self, *, event_id: int, trigger_id: int) -> None:
+ """
+ Record that the subscriber called ``ack()`` for one event.
+
+ The subscriber resolves as acked only once the binding window is
+ closed and every bound seq is confirmed persisted; until then the
+ event stays outstanding (the ack timeout is the backstop).
+ """
+ entry = self._outstanding.get(event_id)
+ if entry is None:
+ return
+ binding = entry.bindings.get(trigger_id)
+ if binding is None:
+ return # already resolved (e.g. force-failed before the ack
landed)
+ binding.acked = True
+ self._maybe_complete(event_id=event_id, trigger_id=trigger_id)
+
+ def _record_nack(self, *, event_id: int, trigger_id: int) -> None:
+ """Resolve the subscriber as nacked immediately; no persistence gating
applies."""
+ self._resolve_subscriber(event_id=event_id, trigger_id=trigger_id,
resolution="nacked")
+
+ def _maybe_complete(self, *, event_id: int, trigger_id: int) -> None:
+ """Resolve the subscriber as acked once ack, window close, and all
persist confirmations are in."""
+ entry = self._outstanding.get(event_id)
+ if entry is None:
+ return
+ binding = entry.bindings.get(trigger_id)
+ if binding is None:
+ return
+ if binding.acked and binding.window_closed and not
binding.unconfirmed_seqs:
+ self._resolve_subscriber(event_id=event_id, trigger_id=trigger_id,
resolution="acked")
+
+ def _close_binding_window(self, token: AckToken) -> None:
+ """
+ Close the binding window of one yielded token.
+
+ Called by ``_ack_drain`` the moment the subscriber pulls the next
+ item — trigger events the subscriber emits from here on belong to
+ the next token, and an ack recorded for this one can now complete
+ (subject to persist confirmations).
+ """
+ trigger_id = token._trigger_id
+ if self._current_token.get(trigger_id) is token:
+ del self._current_token[trigger_id]
+ entry = self._outstanding.get(token._event_id)
+ if entry is None:
+ return
+ binding = entry.bindings.get(trigger_id)
+ if binding is None:
+ return
+ binding.window_closed = True
+ self._maybe_complete(event_id=token._event_id, trigger_id=trigger_id)
+
+ def bind_pending_event(self, *, trigger_id: int, seq_counter:
Iterator[int]) -> int | None:
+ """
+ Bind one just-emitted trigger event to the subscriber's open binding
window.
+
+ Returns the seq the broker advance must wait on, or ``None`` when
+ there is nothing to bind to — no window open (not in ack mode, or
+ the event was emitted outside any raw event's window) or the
+ binding already resolved (nacked or force-failed). A seq is only
+ drawn from ``seq_counter`` when the event actually binds.
+ """
+ token = self._current_token.get(trigger_id)
+ if token is None:
+ return None
+ entry = self._outstanding.get(token._event_id)
+ if entry is None:
+ return None
+ binding = entry.bindings.get(trigger_id)
+ if binding is None:
+ return None
+ seq = next(seq_counter)
+ binding.unconfirmed_seqs.add(seq)
+ self._seq_index[seq] = (token._event_id, trigger_id)
+ return seq
+
+ def confirm_persisted(self, seqs: Iterable[int]) -> None:
+ """
+ Record persist confirmations for trigger event seqs bound in this
group.
+
+ Seqs that are not (or no longer) in ``_seq_index`` — another group's
+ seqs, or bindings that already resolved through nack / timeout —
+ are ignored.
+ """
+ for seq in seqs:
+ bound = self._seq_index.pop(seq, None)
+ if bound is None:
+ continue
+ event_id, trigger_id = bound
+ entry = self._outstanding.get(event_id)
+ if entry is None:
+ continue
+ binding = entry.bindings.get(trigger_id)
+ if binding is None:
+ continue
Review Comment:
nit: Perhaps using `:=` operators here for all the cases here?
##########
airflow-core/docs/authoring-and-scheduling/event-scheduling.rst:
##########
@@ -150,24 +150,217 @@ In other words, the savings is at the poll-loop and
upstream-I/O layer, not at t
Suitable upstreams
^^^^^^^^^^^^^^^^^^
-The shared-stream channel is **one-way** today: events flow from
-``open_shared_stream`` out to each subscriber's ``filter_shared_stream``,
-and there is no way for a subscriber to tell the producer "I accepted /
-dropped / committed this event". That restricts the pattern to upstreams
-whose consumption does **not** depend on a side effect on a handle that
-only the producer holds. Good fits:
+Good fits for the shared-stream pattern:
* Idempotent / read-only reads — directory scans, polling REST APIs.
* Subscriber-side-effect cleanup, where the trigger's per-event action
(``unlink``, local marking, …) goes through APIs the subscriber owns
independently of the shared producer handle.
+* Message-broker upstreams (Kafka, SQS, Pub/Sub, Azure Service Bus) where
+ the producer must commit/delete/ack after all subscribers have processed
+ the message — use the ack channel described below.
+
+Producer-side ack channel
+^^^^^^^^^^^^^^^^^^^^^^^^^
+
+For upstreams where the producer must advance (commit, delete, or ack) only
+after all subscribers have processed an event, override
+:meth:`~airflow.triggers.base.BaseEventTrigger.create_shared_stream_producer`
+to return a :class:`~airflow.triggers.shared_stream.SharedStreamProducer`.
+When this factory is overridden, the manager enters **ack mode**:
+
+1. The manager calls ``create_shared_stream_producer(kwargs)`` once per
+ shared-stream group. The returned producer owns the broker connection for
+ the lifetime of one poll; open the connection lazily inside
+ ``open_stream``, not in the factory.
+2. The producer's ``open_stream`` yields ``(event, broker_payload)`` tuples,
+ where ``broker_payload`` is whatever the producer needs later (e.g. an
+ SQS receipt handle, a Kafka offset, a Pub/Sub ack ID).
+3. Each subscriber's ``filter_shared_stream`` receives ``(event, token)``
+ pairs. The subscriber calls ``await token.ack()`` once it has accepted
+ the event, or ``await token.nack()`` to opt out.
+4. Once every subscriber in the fan-out set has resolved an event (by
+ ``ack()``, ``nack()``, unsubscribing, timing out, or overflowing its
+ queue) **and** every ``TriggerEvent`` the subscribers derived from it
+ has been persisted to the metadata database, the manager calls
+ ``await producer.advance(broker_payload, outcome)`` — commit the offset,
+ delete the SQS message, etc. The ``outcome`` is an
+ :class:`~airflow.triggers.shared_stream.AdvanceOutcome` carrying
+ per-event counts of how the subscribers resolved.
+
+**Ordering guarantee**: by default every event belongs to the same lane,
+so advance calls are dispatched strictly in event order — ``advance`` for
+event N is awaited only after event N-1's ``advance`` returned (or failed
+and was logged). A producer can override ``get_advance_lane`` to narrow
+that ordering to within a lane: events whose lane values compare equal
+are advanced in event order relative to each other, while events in
+different lanes do not wait for one another. Either way, at most one
+``advance`` call is awaited at a time. When the poll ends, the manager
+calls ``await producer.aclose()`` once, best-effort.
+
+Example — SQS-like producer:
-Currently **not** in scope: Kafka consumers (regardless of commit mode),
-SQS with delete-on-process or visibility extension, and any source where
-progress on the producer's handle is tied to the subscriber's accept /
-reject decision. These sources need a way for the subscriber to signal
-acceptance back to the producer, which the current shared-stream API does
-not provide.
+.. code-block:: python
+
+ from collections.abc import AsyncIterator
+ from typing import Any
+
+ from airflow.triggers.base import BaseEventTrigger, TriggerEvent
+ from airflow.triggers.shared_stream import AdvanceOutcome,
SharedStreamProducer
+
+
+ class SqsSharedStreamProducer(SharedStreamProducer):
+ def __init__(self, queue_url: str):
+ self.queue_url = queue_url
+ self.client = None
+
+ async def open_stream(self) -> AsyncIterator[tuple[Any, Any]]:
+ # Open the connection here, not in the trigger's factory.
+ self.client = await create_sqs_client()
+ while True:
+ messages = await poll_sqs(self.client, self.queue_url)
+ for msg in messages:
+ yield msg["Body"], msg["ReceiptHandle"]
+
+ async def advance(self, broker_payload: Any, outcome: AdvanceOutcome)
-> None:
+ # Called once all subscribers have resolved this message.
+ if outcome.is_clean:
+ await delete_sqs_message(self.client, self.queue_url,
broker_payload)
+ # Otherwise leave the message for the visibility timeout to
redeliver.
+
+ async def aclose(self) -> None:
+ if self.client is not None:
+ await self.client.close()
+
+
+ class SqsSharedTrigger(BaseEventTrigger):
+ def __init__(self, *, queue_url: str, region: str | None = None):
+ super().__init__()
+ self.queue_url = queue_url
+ self.region = region
+
+ def serialize(self):
+ return (
+ f"{type(self).__module__}.{type(self).__qualname__}",
+ {"queue_url": self.queue_url, "region": self.region},
+ )
+
+ def shared_stream_key(self):
+ return ("sqs", self.queue_url)
+
+ @classmethod
+ def create_shared_stream_producer(cls, kwargs) ->
SqsSharedStreamProducer:
+ return SqsSharedStreamProducer(kwargs["queue_url"])
+
+ async def filter_shared_stream(self, shared_stream):
+ async for raw, token in shared_stream:
+ if self.region is None or raw.get("region") == self.region:
+ await token.ack()
+ yield TriggerEvent(raw)
+ else:
+ await token.nack()
+
+ async def run(self):
+ yield TriggerEvent({})
+
+Example — Kafka cumulative commit across partitions. A Kafka commit
+acknowledges every offset up to the committed one within a partition, so
+it is only safe if no later event from the same partition can be committed
+while an earlier one is still pending — events on other partitions do not
+matter. Returning ``(topic, partition)`` from ``get_advance_lane`` narrows
+the ordering guarantee to exactly that granularity: each partition's
+commits stay in order, and a slow partition no longer delays commits on
+the other partitions:
+
+.. code-block:: python
+
+ class KafkaSharedStreamProducer(SharedStreamProducer):
+ def __init__(self, topics: list[str]):
+ self.topics = topics
+ self.consumer = None
+
+ async def open_stream(self):
+ # Auto-commit must be off (the Kafka default is on), or the
+ # consumer commits on its own schedule and the ack channel
+ # no longer controls what the broker considers delivered.
+ self.consumer = await create_kafka_consumer(self.topics,
enable_auto_commit=False)
+ async for message in self.consumer:
+ yield message.value, (message.topic, message.partition,
message.offset)
+
+ def get_advance_lane(self, broker_payload):
+ topic, partition, _offset = broker_payload
+ return topic, partition
Review Comment:
IIUC, if we define the `KafkaSharedStreamProducer` with `shared_stream_key`
by Kafka partition, we can avoid the new `get_advance_lane` method at all.
Since the msgs we get from same partition will always in order, so we should
leverage the ordering sementic Kafka give directly (by logically separation)
and no need to interleave the msgs then having another interface to ensure the
ordering ourself IMHO.
The `get_advance_lane` might make sense in some point, but I'm concern with
shipping it as public interface (then it will be much difficult to get rid of
it).
##########
airflow-core/src/airflow/triggers/shared_stream.py:
##########
@@ -163,17 +445,45 @@ def __init__(
kwargs: dict[str, Any],
on_poll_terminate: Callable[[_SharedStreamGroup], None],
max_subscriber_queue: int,
+ ack_timeout: float,
log: BoundLogger,
+ _now: Callable[[], float] = time.monotonic,
) -> None:
self.key = key
self.trigger_class = trigger_class
self.kwargs = kwargs
self.log = log
self._on_poll_terminate = on_poll_terminate
self._max_subscriber_queue = max_subscriber_queue
+ self._ack_timeout = ack_timeout
+ self._now = _now
self._subscribers: dict[int, asyncio.Queue] = {}
- self._overflowed: set[int] = set()
+ # Subscribers already force-failed (queue overflow or ack timeout);
+ # excluded from subsequent broadcasts until they unsubscribe.
+ self._failed_subscribers: set[int] = set()
self._poll_task: asyncio.Task | None = None
+ # Constant for the group's lifetime: trigger_class never changes.
+ self._ack_required: bool = self._is_ack_required()
+ # Ack mode state — populated only when create_shared_stream_producer
+ # is overridden.
+ self._outstanding: dict[int, _OutstandingEntry] = {}
+ # Per-lane FIFO index over _outstanding: event ids in fan-out order,
+ # keyed by the lane each event's producer assigned at fan-out.
+ self._lane_queues: dict[Hashable, deque[int]] = {}
+ # seq -> (event_id, trigger_id) for trigger events awaiting persist
+ # confirmation; entries are removed on confirmation or when the
+ # owning binding resolves (late confirmations then no-op).
+ self._seq_index: dict[int, tuple[int, int]] = {}
+ # Per subscriber: the token whose binding window is currently open —
+ # trigger events the subscriber emits now bind to this token.
+ self._current_token: dict[int, AckToken] = {}
+ self._next_event_id: int = 0
+ self._ack_timeout_task: asyncio.Task | None = None
+ # Advance pump: a single task dispatches broker advances in per-lane
+ # fan-out order, woken through this event whenever an entry resolves.
+ self._advance_wakeup: asyncio.Event = asyncio.Event()
+ self._pump_stopping: bool = False
+ self._pump_task: asyncio.Task | None = None
Review Comment:
Without the public `ack / nck` interface, then we _might_ have chance to
simplify the state as what we really need for ack-after-persist.
##########
airflow-core/docs/authoring-and-scheduling/event-scheduling.rst:
##########
@@ -150,24 +150,217 @@ In other words, the savings is at the poll-loop and
upstream-I/O layer, not at t
Suitable upstreams
^^^^^^^^^^^^^^^^^^
-The shared-stream channel is **one-way** today: events flow from
-``open_shared_stream`` out to each subscriber's ``filter_shared_stream``,
-and there is no way for a subscriber to tell the producer "I accepted /
-dropped / committed this event". That restricts the pattern to upstreams
-whose consumption does **not** depend on a side effect on a handle that
-only the producer holds. Good fits:
+Good fits for the shared-stream pattern:
* Idempotent / read-only reads — directory scans, polling REST APIs.
* Subscriber-side-effect cleanup, where the trigger's per-event action
(``unlink``, local marking, …) goes through APIs the subscriber owns
independently of the shared producer handle.
+* Message-broker upstreams (Kafka, SQS, Pub/Sub, Azure Service Bus) where
+ the producer must commit/delete/ack after all subscribers have processed
+ the message — use the ack channel described below.
+
+Producer-side ack channel
+^^^^^^^^^^^^^^^^^^^^^^^^^
+
+For upstreams where the producer must advance (commit, delete, or ack) only
+after all subscribers have processed an event, override
+:meth:`~airflow.triggers.base.BaseEventTrigger.create_shared_stream_producer`
+to return a :class:`~airflow.triggers.shared_stream.SharedStreamProducer`.
+When this factory is overridden, the manager enters **ack mode**:
+
+1. The manager calls ``create_shared_stream_producer(kwargs)`` once per
+ shared-stream group. The returned producer owns the broker connection for
+ the lifetime of one poll; open the connection lazily inside
+ ``open_stream``, not in the factory.
+2. The producer's ``open_stream`` yields ``(event, broker_payload)`` tuples,
+ where ``broker_payload`` is whatever the producer needs later (e.g. an
+ SQS receipt handle, a Kafka offset, a Pub/Sub ack ID).
+3. Each subscriber's ``filter_shared_stream`` receives ``(event, token)``
+ pairs. The subscriber calls ``await token.ack()`` once it has accepted
+ the event, or ``await token.nack()`` to opt out.
+4. Once every subscriber in the fan-out set has resolved an event (by
+ ``ack()``, ``nack()``, unsubscribing, timing out, or overflowing its
+ queue) **and** every ``TriggerEvent`` the subscribers derived from it
+ has been persisted to the metadata database, the manager calls
+ ``await producer.advance(broker_payload, outcome)`` — commit the offset,
+ delete the SQS message, etc. The ``outcome`` is an
+ :class:`~airflow.triggers.shared_stream.AdvanceOutcome` carrying
+ per-event counts of how the subscribers resolved.
+
+**Ordering guarantee**: by default every event belongs to the same lane,
+so advance calls are dispatched strictly in event order — ``advance`` for
+event N is awaited only after event N-1's ``advance`` returned (or failed
+and was logged). A producer can override ``get_advance_lane`` to narrow
+that ordering to within a lane: events whose lane values compare equal
+are advanced in event order relative to each other, while events in
+different lanes do not wait for one another. Either way, at most one
+``advance`` call is awaited at a time. When the poll ends, the manager
+calls ``await producer.aclose()`` once, best-effort.
+
+Example — SQS-like producer:
-Currently **not** in scope: Kafka consumers (regardless of commit mode),
-SQS with delete-on-process or visibility extension, and any source where
-progress on the producer's handle is tied to the subscriber's accept /
-reject decision. These sources need a way for the subscriber to signal
-acceptance back to the producer, which the current shared-stream API does
-not provide.
+.. code-block:: python
+
+ from collections.abc import AsyncIterator
+ from typing import Any
+
+ from airflow.triggers.base import BaseEventTrigger, TriggerEvent
+ from airflow.triggers.shared_stream import AdvanceOutcome,
SharedStreamProducer
+
+
+ class SqsSharedStreamProducer(SharedStreamProducer):
+ def __init__(self, queue_url: str):
+ self.queue_url = queue_url
+ self.client = None
+
+ async def open_stream(self) -> AsyncIterator[tuple[Any, Any]]:
+ # Open the connection here, not in the trigger's factory.
+ self.client = await create_sqs_client()
+ while True:
+ messages = await poll_sqs(self.client, self.queue_url)
+ for msg in messages:
+ yield msg["Body"], msg["ReceiptHandle"]
+
+ async def advance(self, broker_payload: Any, outcome: AdvanceOutcome)
-> None:
+ # Called once all subscribers have resolved this message.
+ if outcome.is_clean:
+ await delete_sqs_message(self.client, self.queue_url,
broker_payload)
+ # Otherwise leave the message for the visibility timeout to
redeliver.
+
+ async def aclose(self) -> None:
+ if self.client is not None:
+ await self.client.close()
+
+
+ class SqsSharedTrigger(BaseEventTrigger):
+ def __init__(self, *, queue_url: str, region: str | None = None):
+ super().__init__()
+ self.queue_url = queue_url
+ self.region = region
+
+ def serialize(self):
+ return (
+ f"{type(self).__module__}.{type(self).__qualname__}",
+ {"queue_url": self.queue_url, "region": self.region},
+ )
+
+ def shared_stream_key(self):
+ return ("sqs", self.queue_url)
+
+ @classmethod
+ def create_shared_stream_producer(cls, kwargs) ->
SqsSharedStreamProducer:
+ return SqsSharedStreamProducer(kwargs["queue_url"])
+
+ async def filter_shared_stream(self, shared_stream):
+ async for raw, token in shared_stream:
+ if self.region is None or raw.get("region") == self.region:
+ await token.ack()
+ yield TriggerEvent(raw)
+ else:
+ await token.nack()
+
+ async def run(self):
+ yield TriggerEvent({})
+
+Example — Kafka cumulative commit across partitions. A Kafka commit
+acknowledges every offset up to the committed one within a partition, so
+it is only safe if no later event from the same partition can be committed
+while an earlier one is still pending — events on other partitions do not
+matter. Returning ``(topic, partition)`` from ``get_advance_lane`` narrows
+the ordering guarantee to exactly that granularity: each partition's
+commits stay in order, and a slow partition no longer delays commits on
+the other partitions:
+
+.. code-block:: python
+
+ class KafkaSharedStreamProducer(SharedStreamProducer):
+ def __init__(self, topics: list[str]):
+ self.topics = topics
+ self.consumer = None
+
+ async def open_stream(self):
+ # Auto-commit must be off (the Kafka default is on), or the
+ # consumer commits on its own schedule and the ack channel
+ # no longer controls what the broker considers delivered.
+ self.consumer = await create_kafka_consumer(self.topics,
enable_auto_commit=False)
+ async for message in self.consumer:
+ yield message.value, (message.topic, message.partition,
message.offset)
+
+ def get_advance_lane(self, broker_payload):
+ topic, partition, _offset = broker_payload
+ return topic, partition
+
+ async def advance(self, broker_payload, outcome):
Review Comment:
Sorry that I didn't catch this at the first place.
Would it be better to make per-event based`advance` interface as batched
in-order (e.g. `advance(batch: list[(broker_payload, outcome)])` per lane?
IIUC, having message-by-message is anti-pattern for Kafka.
---
Here's the context that I just double checked with Google AI summary.
Avoid committing every message: While committing message-by-message
maximizes data safety, it creates massive network overhead. Consider processing
and committing in batches if your throughput is high.
##########
airflow-core/docs/authoring-and-scheduling/event-scheduling.rst:
##########
@@ -150,24 +150,217 @@ In other words, the savings is at the poll-loop and
upstream-I/O layer, not at t
Suitable upstreams
^^^^^^^^^^^^^^^^^^
-The shared-stream channel is **one-way** today: events flow from
-``open_shared_stream`` out to each subscriber's ``filter_shared_stream``,
-and there is no way for a subscriber to tell the producer "I accepted /
-dropped / committed this event". That restricts the pattern to upstreams
-whose consumption does **not** depend on a side effect on a handle that
-only the producer holds. Good fits:
+Good fits for the shared-stream pattern:
* Idempotent / read-only reads — directory scans, polling REST APIs.
* Subscriber-side-effect cleanup, where the trigger's per-event action
(``unlink``, local marking, …) goes through APIs the subscriber owns
independently of the shared producer handle.
+* Message-broker upstreams (Kafka, SQS, Pub/Sub, Azure Service Bus) where
+ the producer must commit/delete/ack after all subscribers have processed
+ the message — use the ack channel described below.
+
+Producer-side ack channel
+^^^^^^^^^^^^^^^^^^^^^^^^^
+
+For upstreams where the producer must advance (commit, delete, or ack) only
+after all subscribers have processed an event, override
+:meth:`~airflow.triggers.base.BaseEventTrigger.create_shared_stream_producer`
+to return a :class:`~airflow.triggers.shared_stream.SharedStreamProducer`.
+When this factory is overridden, the manager enters **ack mode**:
+
+1. The manager calls ``create_shared_stream_producer(kwargs)`` once per
+ shared-stream group. The returned producer owns the broker connection for
+ the lifetime of one poll; open the connection lazily inside
+ ``open_stream``, not in the factory.
+2. The producer's ``open_stream`` yields ``(event, broker_payload)`` tuples,
+ where ``broker_payload`` is whatever the producer needs later (e.g. an
+ SQS receipt handle, a Kafka offset, a Pub/Sub ack ID).
+3. Each subscriber's ``filter_shared_stream`` receives ``(event, token)``
+ pairs. The subscriber calls ``await token.ack()`` once it has accepted
+ the event, or ``await token.nack()`` to opt out.
+4. Once every subscriber in the fan-out set has resolved an event (by
+ ``ack()``, ``nack()``, unsubscribing, timing out, or overflowing its
+ queue) **and** every ``TriggerEvent`` the subscribers derived from it
+ has been persisted to the metadata database, the manager calls
+ ``await producer.advance(broker_payload, outcome)`` — commit the offset,
+ delete the SQS message, etc. The ``outcome`` is an
+ :class:`~airflow.triggers.shared_stream.AdvanceOutcome` carrying
+ per-event counts of how the subscribers resolved.
+
+**Ordering guarantee**: by default every event belongs to the same lane,
+so advance calls are dispatched strictly in event order — ``advance`` for
+event N is awaited only after event N-1's ``advance`` returned (or failed
+and was logged). A producer can override ``get_advance_lane`` to narrow
+that ordering to within a lane: events whose lane values compare equal
+are advanced in event order relative to each other, while events in
+different lanes do not wait for one another. Either way, at most one
+``advance`` call is awaited at a time. When the poll ends, the manager
+calls ``await producer.aclose()`` once, best-effort.
+
+Example — SQS-like producer:
-Currently **not** in scope: Kafka consumers (regardless of commit mode),
-SQS with delete-on-process or visibility extension, and any source where
-progress on the producer's handle is tied to the subscriber's accept /
-reject decision. These sources need a way for the subscriber to signal
-acceptance back to the producer, which the current shared-stream API does
-not provide.
+.. code-block:: python
+
+ from collections.abc import AsyncIterator
+ from typing import Any
+
+ from airflow.triggers.base import BaseEventTrigger, TriggerEvent
+ from airflow.triggers.shared_stream import AdvanceOutcome,
SharedStreamProducer
+
+
+ class SqsSharedStreamProducer(SharedStreamProducer):
+ def __init__(self, queue_url: str):
+ self.queue_url = queue_url
+ self.client = None
+
+ async def open_stream(self) -> AsyncIterator[tuple[Any, Any]]:
+ # Open the connection here, not in the trigger's factory.
+ self.client = await create_sqs_client()
+ while True:
+ messages = await poll_sqs(self.client, self.queue_url)
+ for msg in messages:
+ yield msg["Body"], msg["ReceiptHandle"]
+
+ async def advance(self, broker_payload: Any, outcome: AdvanceOutcome)
-> None:
+ # Called once all subscribers have resolved this message.
+ if outcome.is_clean:
+ await delete_sqs_message(self.client, self.queue_url,
broker_payload)
+ # Otherwise leave the message for the visibility timeout to
redeliver.
+
+ async def aclose(self) -> None:
+ if self.client is not None:
+ await self.client.close()
+
+
+ class SqsSharedTrigger(BaseEventTrigger):
+ def __init__(self, *, queue_url: str, region: str | None = None):
+ super().__init__()
+ self.queue_url = queue_url
+ self.region = region
+
+ def serialize(self):
+ return (
+ f"{type(self).__module__}.{type(self).__qualname__}",
+ {"queue_url": self.queue_url, "region": self.region},
+ )
+
+ def shared_stream_key(self):
+ return ("sqs", self.queue_url)
+
+ @classmethod
+ def create_shared_stream_producer(cls, kwargs) ->
SqsSharedStreamProducer:
+ return SqsSharedStreamProducer(kwargs["queue_url"])
+
+ async def filter_shared_stream(self, shared_stream):
+ async for raw, token in shared_stream:
+ if self.region is None or raw.get("region") == self.region:
+ await token.ack()
Review Comment:
It doesn't seem correct to expose the `token.ack()` or `token.nak()` for
user.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]