Lee-W commented on code in PR #67523:
URL: https://github.com/apache/airflow/pull/67523#discussion_r3413805364
##########
airflow-core/docs/authoring-and-scheduling/event-scheduling.rst:
##########
@@ -150,24 +150,267 @@ In other words, the savings is at the poll-loop and
upstream-I/O layer, not at t
Suitable upstreams
^^^^^^^^^^^^^^^^^^
-The shared-stream channel is **one-way** today: events flow from
-``open_shared_stream`` out to each subscriber's ``filter_shared_stream``,
-and there is no way for a subscriber to tell the producer "I accepted /
-dropped / committed this event". That restricts the pattern to upstreams
-whose consumption does **not** depend on a side effect on a handle that
-only the producer holds. Good fits:
+Good fits for the shared-stream pattern:
* Idempotent / read-only reads — directory scans, polling REST APIs.
* Subscriber-side-effect cleanup, where the trigger's per-event action
(``unlink``, local marking, …) goes through APIs the subscriber owns
independently of the shared producer handle.
+* Message-broker upstreams (Kafka, SQS, Pub/Sub, Azure Service Bus) where
+ the producer must commit/delete/ack after all subscribers have processed
+ the message — use the ack channel described below.
+
+Producer-side ack channel
+^^^^^^^^^^^^^^^^^^^^^^^^^
+
+For upstreams where the producer must advance (commit, delete, or ack) only
+after all subscribers have processed an event, override
+:meth:`~airflow.triggers.base.BaseEventTrigger.create_shared_stream_producer`
+to return a :class:`~airflow.triggers.shared_stream.SharedStreamProducer`.
+When this factory is overridden, the manager enters **ack mode**. The
+subscriber side does not change: ``filter_shared_stream`` receives raw
+events exactly as on the fast path, so the same filter code works in both
+modes — the framework infers when the broker may advance from each
+subscriber's consumption progress and from the persistence of the trigger
+events it derived.
+
+1. The manager calls ``create_shared_stream_producer(kwargs)`` once per
+ shared-stream group. The returned producer owns the broker connection for
+ the lifetime of one poll; open the connection lazily inside
+ ``open_stream``, not in the factory.
+2. The producer's ``open_stream`` yields ``(event, broker_payload)`` tuples,
+ where ``broker_payload`` is whatever the producer needs later (e.g. an
+ SQS receipt handle, a Kafka offset, a Pub/Sub ack ID).
+3. Each subscriber's ``filter_shared_stream`` receives raw events exactly
+ as in the fast path. A subscriber resolves an event once it has moved
+ past it (pulled the next raw event, or unsubscribed) and every
+ ``TriggerEvent`` it derived from the event has been persisted to the
+ metadata database.
+4. Once every subscriber in the fan-out set has resolved an event (by
+ moving past it, unsubscribing, timing out, or overflowing its queue),
+ the manager calls ``await producer.advance(batch)`` with the contiguous
+ prefix of fully resolved events in the event's lane — commit the
+ offsets, delete the SQS messages, etc. Each item in the batch is an
+ :class:`~airflow.triggers.shared_stream.AdvanceItem` carrying the
+ event's ``broker_payload`` and an
+ :class:`~airflow.triggers.shared_stream.AdvanceOutcome` with per-event
+ counts of how the subscribers resolved.
+
+**Rejecting an event**: a subscriber's filter can actively refuse a raw
+event instead of yielding a trigger event from it, by calling
+:func:`~airflow.triggers.shared_stream.reject_shared_stream_event` while
+processing that event. This is distinct from an involuntary failure. A
+``failed`` count means a subscriber did not finish in time (ack timeout) or
+fell behind (queue overflow) — the right response is usually redelivery. A
+``rejected`` count means a subscriber decided the event must not produce a
+trigger event and should be terminally discarded — the right response is to
+dead-letter or ``nack`` it, not redeliver. The
+:class:`~airflow.triggers.shared_stream.AdvanceOutcome` reports both
+counts separately so the producer can apply the right per-broker action in
+``advance``:
+
+* Azure Service Bus: dead-letter the message when ``rejected`` is non-zero,
+ abandon it (so the broker redelivers) when only ``failed`` is non-zero,
+ and complete it when every subscriber accepted the event.
+* Pub/Sub: ``nack`` the message on a reject, otherwise ``ack`` it.
+
+The framework only reports the counts — it never dead-letters, ``nack`` s,
+or redelivers on its own; that broker-specific decision lives entirely in
+the producer's ``advance``.
+
+``reject_shared_stream_event`` is meaningful only while the filter is
+processing a raw event in ack mode (the binding window of that event is
+open). Called on the fast path, from a standalone ``run()``, or between two
+raw events, it logs a warning and does nothing, because there is no broker
+advance to influence. Because it resolves the event immediately, there is
+nothing to persist and the reject does not wait on the persistence gate.
+
+``is_clean`` is ``True`` only when an event has no rejects and no failures —
+every subscriber that was online at broadcast accepted it. A single reject
+or a single failure makes ``is_clean`` ``False``.
+
+Example — reject inside a filter:
-Currently **not** in scope: Kafka consumers (regardless of commit mode),
-SQS with delete-on-process or visibility extension, and any source where
-progress on the producer's handle is tied to the subscriber's accept /
-reject decision. These sources need a way for the subscriber to signal
-acceptance back to the producer, which the current shared-stream API does
-not provide.
+.. code-block:: python
+
+ from airflow.triggers.shared_stream import reject_shared_stream_event
+
+
+ async def filter_shared_stream(self, shared_stream):
+ async for raw in shared_stream:
+ if raw.get("malformed"):
+ # Never produce a trigger event from this; have the broker
+ # dead-letter it rather than redeliver it.
+ reject_shared_stream_event()
+ continue
+ yield TriggerEvent(raw)
+
+**Ordering guarantee**: by default every event belongs to the same lane.
+The items of a batch are in event order and form their lane's contiguous
+resolved prefix; within a lane, batches arrive strictly in order — the
+next ``advance`` is awaited only after the previous call returned (or
+failed and was logged). A producer can override ``get_advance_lane`` to
+narrow that ordering to within a lane: events whose lane values compare
+equal are batched and advanced in event order relative to each other,
+while events in different lanes do not wait for one another. Either way,
+at most one ``advance`` call is awaited at a time, and cumulative schemes
+such as a Kafka offset commit only need to commit the batch's last item.
+When the poll ends, the manager calls ``await producer.aclose()`` once,
+best-effort.
+
+Example — SQS-like producer:
+
+.. code-block:: python
+
+ from collections.abc import AsyncIterator, Sequence
+ from typing import Any
+
+ from airflow.triggers.base import BaseEventTrigger, TriggerEvent
+ from airflow.triggers.shared_stream import AdvanceItem,
SharedStreamProducer
+
+
+ class SqsSharedStreamProducer(SharedStreamProducer):
+ def __init__(self, queue_url: str):
+ self.queue_url = queue_url
+ self.client = None
+
+ async def open_stream(self) -> AsyncIterator[tuple[Any, Any]]:
+ # Open the connection here, not in the trigger's factory.
+ self.client = await create_sqs_client()
+ while True:
+ messages = await poll_sqs(self.client, self.queue_url)
+ for msg in messages:
+ yield msg["Body"], msg["ReceiptHandle"]
+
+ async def advance(self, batch: Sequence[AdvanceItem]) -> None:
+ # Called with one lane's batch of fully resolved messages.
+ for receipt_handle, outcome in batch:
+ if outcome.is_clean:
+ await delete_sqs_message(self.client, self.queue_url,
receipt_handle)
+ # Otherwise leave the message for the visibility timeout to
redeliver.
+
+ async def aclose(self) -> None:
+ if self.client is not None:
+ await self.client.close()
+
+
+ class SqsSharedTrigger(BaseEventTrigger):
+ def __init__(self, *, queue_url: str, region: str | None = None):
+ super().__init__()
+ self.queue_url = queue_url
+ self.region = region
+
+ def serialize(self):
+ return (
+ f"{type(self).__module__}.{type(self).__qualname__}",
+ {"queue_url": self.queue_url, "region": self.region},
+ )
+
+ def shared_stream_key(self):
+ return ("sqs", self.queue_url)
+
+ @classmethod
+ def create_shared_stream_producer(cls, kwargs) ->
SqsSharedStreamProducer:
+ return SqsSharedStreamProducer(kwargs["queue_url"])
+
+ async def filter_shared_stream(self, shared_stream):
+ async for raw in shared_stream:
+ if self.region is None or raw.get("region") == self.region:
+ yield TriggerEvent(raw)
+
+ async def run(self):
+ yield TriggerEvent({})
+
+Example — Kafka cumulative commit across partitions. A Kafka commit
+acknowledges every offset up to the committed one within a partition, so
+it is only safe if no later event from the same partition can be committed
+while an earlier one is still pending — events on other partitions do not
+matter. Returning ``(topic, partition)`` from ``get_advance_lane`` narrows
+the ordering guarantee to exactly that granularity: each partition's
+commits stay in order, and a slow partition no longer delays commits on
+the other partitions:
+
+.. code-block:: python
+
+ class KafkaSharedStreamProducer(SharedStreamProducer):
+ def __init__(self, topics: list[str]):
+ self.topics = topics
+ self.consumer = None
+
+ async def open_stream(self):
+ # Auto-commit must be off (the Kafka default is on), or the
+ # consumer commits on its own schedule and the ack channel
+ # no longer controls what the broker considers delivered.
+ self.consumer = await create_kafka_consumer(self.topics,
enable_auto_commit=False)
+ async for message in self.consumer:
+ yield message.value, (message.topic, message.partition,
message.offset)
+
+ def get_advance_lane(self, broker_payload):
+ topic, partition, _offset = broker_payload
+ return topic, partition
+
+ async def advance(self, batch):
+ # The batch is one lane's — here, one partition's — contiguous
+ # resolved prefix, in event order, so committing the offset of
+ # its last item covers the whole batch and can never skip past
+ # an event that is still pending.
+ topic, partition, offset = batch[-1].broker_payload
+ await self.consumer.commit(topic, partition, offset + 1)
Review Comment:
It now unpacks each `AdvanceItem` as `(broker_payload, outcome)`, routes
non-clean outcomes to a dead-letter queue, then does the cumulative commit.
Also aligned the surrounding `is_clean` prose with the new behavior that a
zero-subscriber is not clean.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]