Lee-W commented on code in PR #67523:
URL: https://github.com/apache/airflow/pull/67523#discussion_r3402167068


##########
airflow-core/docs/authoring-and-scheduling/event-scheduling.rst:
##########
@@ -150,24 +150,196 @@ In other words, the savings is at the poll-loop and 
upstream-I/O layer, not at t
 Suitable upstreams
 ^^^^^^^^^^^^^^^^^^
 
-The shared-stream channel is **one-way** today: events flow from
-``open_shared_stream`` out to each subscriber's ``filter_shared_stream``,
-and there is no way for a subscriber to tell the producer "I accepted /
-dropped / committed this event". That restricts the pattern to upstreams
-whose consumption does **not** depend on a side effect on a handle that
-only the producer holds. Good fits:
+Good fits for the shared-stream pattern:
 
 * Idempotent / read-only reads — directory scans, polling REST APIs.
 * Subscriber-side-effect cleanup, where the trigger's per-event action
   (``unlink``, local marking, …) goes through APIs the subscriber owns
   independently of the shared producer handle.
+* Message-broker upstreams (Kafka, SQS, Pub/Sub, Azure Service Bus) where
+  the producer must commit/delete/ack after all subscribers have processed
+  the message — use the ack channel described below.
+
+Producer-side ack channel
+^^^^^^^^^^^^^^^^^^^^^^^^^
+
+For upstreams where the producer must advance (commit, delete, or ack) only
+after all subscribers have processed an event, override
+:meth:`~airflow.triggers.base.BaseEventTrigger.create_shared_stream_producer`
+to return a :class:`~airflow.triggers.shared_stream.SharedStreamProducer`.
+When this factory is overridden, the manager enters **ack mode**:
+
+1. The manager calls ``create_shared_stream_producer(kwargs)`` once per
+   shared-stream group. The returned producer owns the broker connection for
+   the lifetime of one poll; open the connection lazily inside
+   ``open_stream``, not in the factory.
+2. The producer's ``open_stream`` yields ``(event, broker_payload)`` tuples,
+   where ``broker_payload`` is whatever the producer needs later (e.g. an
+   SQS receipt handle, a Kafka offset, a Pub/Sub ack ID).
+3. Each subscriber's ``filter_shared_stream`` receives ``(event, token)``
+   pairs. The subscriber calls ``await token.ack()`` once it has accepted
+   the event, or ``await token.nack()`` to opt out.
+4. Once every subscriber in the fan-out set has resolved an event (by
+   ``ack()``, ``nack()``, unsubscribing, timing out, or overflowing its
+   queue), the manager calls
+   ``await producer.advance(broker_payload, outcome)`` — commit the offset,
+   delete the SQS message, etc. The ``outcome`` is an
+   :class:`~airflow.triggers.shared_stream.AdvanceOutcome` carrying
+   per-event counts of how the subscribers resolved.
+
+**Ordering guarantee**: by default every event belongs to the same lane,
+so advance calls are dispatched strictly in event order — ``advance`` for
+event N is awaited only after event N-1's ``advance`` returned (or failed
+and was logged). A producer can override ``get_advance_lane`` to narrow
+that ordering to within a lane: events whose lane values compare equal
+are advanced in event order relative to each other, while events in
+different lanes do not wait for one another. Either way, at most one
+``advance`` call is awaited at a time. When the poll ends, the manager
+calls ``await producer.aclose()`` once, best-effort.
+
+Example — SQS-like producer:
 
-Currently **not** in scope: Kafka consumers (regardless of commit mode),
-SQS with delete-on-process or visibility extension, and any source where
-progress on the producer's handle is tied to the subscriber's accept /
-reject decision. These sources need a way for the subscriber to signal
-acceptance back to the producer, which the current shared-stream API does
-not provide.
+.. code-block:: python
+
+    from collections.abc import AsyncIterator
+    from typing import Any
+
+    from airflow.triggers.base import BaseEventTrigger, TriggerEvent
+    from airflow.triggers.shared_stream import AdvanceOutcome, 
SharedStreamProducer
+
+
+    class SqsSharedStreamProducer(SharedStreamProducer):
+        def __init__(self, queue_url: str):
+            self.queue_url = queue_url
+            self.client = None
+
+        async def open_stream(self) -> AsyncIterator[tuple[Any, Any]]:
+            # Open the connection here, not in the trigger's factory.
+            self.client = await create_sqs_client()
+            while True:
+                messages = await poll_sqs(self.client, self.queue_url)
+                for msg in messages:
+                    yield msg["Body"], msg["ReceiptHandle"]
+
+        async def advance(self, broker_payload: Any, outcome: AdvanceOutcome) 
-> None:
+            # Called once all subscribers have resolved this message.
+            if outcome.is_clean:
+                await delete_sqs_message(self.client, self.queue_url, 
broker_payload)
+            # Otherwise leave the message for the visibility timeout to 
redeliver.
+
+        async def aclose(self) -> None:
+            if self.client is not None:
+                await self.client.close()
+
+
+    class SqsSharedTrigger(BaseEventTrigger):
+        def __init__(self, *, queue_url: str, region: str | None = None):
+            super().__init__()
+            self.queue_url = queue_url
+            self.region = region
+
+        def serialize(self):
+            return (
+                f"{type(self).__module__}.{type(self).__qualname__}",
+                {"queue_url": self.queue_url, "region": self.region},
+            )
+
+        def shared_stream_key(self):
+            return ("sqs", self.queue_url)
+
+        @classmethod
+        def create_shared_stream_producer(cls, kwargs) -> 
SqsSharedStreamProducer:
+            return SqsSharedStreamProducer(kwargs["queue_url"])
+
+        async def filter_shared_stream(self, shared_stream):
+            async for raw, token in shared_stream:
+                if self.region is None or raw.get("region") == self.region:
+                    await token.ack()
+                    yield TriggerEvent(raw)
+                else:
+                    await token.nack()
+
+        async def run(self):  # pragma: no cover
+            yield TriggerEvent({})
+
+Example — Kafka cumulative commit across partitions. A Kafka commit
+acknowledges every offset up to the committed one within a partition, so
+it is only safe if no later event from the same partition can be committed
+while an earlier one is still pending — events on other partitions do not
+matter. Returning ``(topic, partition)`` from ``get_advance_lane`` narrows
+the ordering guarantee to exactly that granularity: each partition's
+commits stay in order, and a slow partition no longer delays commits on
+the other partitions:
+
+.. code-block:: python
+
+    class KafkaSharedStreamProducer(SharedStreamProducer):
+        def __init__(self, topics: list[str]):
+            self.topics = topics
+            self.consumer = None
+
+        async def open_stream(self):
+            self.consumer = await create_kafka_consumer(self.topics)
+            async for message in self.consumer:
+                yield message.value, (message.topic, message.partition, 
message.offset)
+
+        def get_advance_lane(self, broker_payload):
+            topic, partition, _offset = broker_payload
+            return topic, partition
+
+        async def advance(self, broker_payload, outcome):
+            topic, partition, offset = broker_payload

Review Comment:
   just updated accordingly. please take another look. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to