Lee-W commented on code in PR #67523:
URL: https://github.com/apache/airflow/pull/67523#discussion_r3414396441


##########
airflow-core/src/airflow/triggers/shared_stream.py:
##########
@@ -220,20 +732,280 @@ async def _poll(self) -> None:
                     # Drain stale events then put the failure sentinel so every
                     # subscriber wakes up even if its queue was at capacity.
                     self._drain_and_offer_failure(queue, failure)
+            # End of synchronous section; yields are safe from here on.
+            if cancelled_ack_task is not None:
+                with suppress(asyncio.CancelledError):
+                    await cancelled_ack_task
+            if self._pump_task is not None:
+                # The pump exits by itself once it has dispatched everything
+                # already resolved and dispatchable; the suppress is defensive.
+                with suppress(asyncio.CancelledError):
+                    await self._pump_task
+            if producer is not None:
+                try:
+                    await producer.aclose()
+                except Exception as exc:
+                    self.log.warning("Producer aclose failed", key=self.key, 
exc_info=exc)
+
+    def _resolve_subscriber(
+        self,
+        *,
+        event_id: int,
+        trigger_id: int,
+        resolution: Literal["acked", "failed", "rejected"],
+    ) -> None:
+        """
+        Record one subscriber's resolution of one event.
+
+        Removes ``trigger_id`` from the pending set of ``event_id`` and adds
+        it to the matching outcome count. If the set empties, wakes the
+        advance pump — deleting the entry and dispatching the broker advance
+        are the pump's job, so advances stay in per-lane fan-out order. 
Duplicate
+        calls for the same (event_id, trigger_id) are no-ops, which keeps
+        every subscriber counted exactly once per event.
+
+        This is the single cleanup point for the subscriber's resolution
+        bookkeeping: the binding is popped and its unconfirmed seqs leave
+        ``_seq_index``, so a persist confirmation arriving after the fact
+        (e.g. after an ack timeout already failed the subscriber) finds
+        nothing and no-ops.
+        """
+        entry = self._outstanding.get(event_id)
+        if entry is None or trigger_id not in entry.pending:
+            return  # already resolved, already advanced, or never existed
+        entry.pending.discard(trigger_id)
+        binding = entry.bindings.pop(trigger_id, None)
+        if binding is not None:
+            for seq in binding.unconfirmed_seqs:
+                self._seq_index.pop(seq, None)
+        if resolution == "acked":
+            entry.acked += 1
+        elif resolution == "rejected":
+            entry.rejected += 1
+        else:
+            entry.failed += 1
+        if not entry.pending:
+            self._advance_wakeup.set()
+
+    def _reject_pending_event(self, *, trigger_id: int, event_id: int) -> None:
+        """
+        Resolve the subscriber's current open-window event as rejected.
+
+        Called by :func:`reject_shared_stream_event` while the subscriber's
+        binding window for ``event_id`` is open. Unlike a normal acceptance,
+        a reject resolves immediately — there is no derived trigger event, so
+        nothing has to be persisted first and the persist gate is skipped.
+
+        A subscriber that rejects an event and then still yields a real
+        trigger event for it is a subscriber-side logic error: the reject has
+        already popped the binding, so :meth:`bind_pending_event` returns
+        ``None`` and the late event simply does not bind to this raw event's
+        ack accounting (it still fires). The framework absorbs this safely
+        rather than raising.
+        """
+        self._resolve_subscriber(event_id=event_id, trigger_id=trigger_id, 
resolution="rejected")
+
+    def _maybe_complete(self, *, event_id: int, trigger_id: int) -> None:
+        """Resolve the subscriber as acked once the window is closed and all 
persist confirmations are in."""
+        entry = self._outstanding.get(event_id)
+        if entry is None:
+            return
+        binding = entry.bindings.get(trigger_id)
+        if binding is None:
+            return
+        if binding.window_closed and not binding.unconfirmed_seqs:
+            self._resolve_subscriber(event_id=event_id, trigger_id=trigger_id, 
resolution="acked")
+
+    def _close_binding_window(self, *, event_id: int, trigger_id: int) -> None:
+        """
+        Close the subscriber's binding window for one event.
+
+        Called by ``_ack_drain`` the moment the subscriber pulls the next
+        item — trigger events the subscriber emits from here on belong to
+        the next event, and this one can now resolve for the subscriber
+        (subject to persist confirmations).
+        """
+        if self._open_windows.get(trigger_id) == event_id:
+            del self._open_windows[trigger_id]
+        entry = self._outstanding.get(event_id)
+        if entry is None:
+            return
+        binding = entry.bindings.get(trigger_id)
+        if binding is None:
+            return
+        binding.window_closed = True
+        self._maybe_complete(event_id=event_id, trigger_id=trigger_id)
+
+    def bind_pending_event(self, *, trigger_id: int, seq_counter: 
Iterator[int]) -> int | None:
+        """
+        Bind one just-emitted trigger event to the subscriber's open binding 
window.
+
+        Returns the seq the broker advance must wait on, or ``None`` when
+        there is nothing to bind to — no window open (not in ack mode, or
+        the event was emitted outside any raw event's window) or the
+        binding already resolved (force-failed). A seq is only drawn from
+        ``seq_counter`` when the event actually binds.
+        """
+        event_id = self._open_windows.get(trigger_id)
+        if event_id is None:
+            return None
+        entry = self._outstanding.get(event_id)
+        if entry is None:
+            return None
+        binding = entry.bindings.get(trigger_id)
+        if binding is None:
+            return None
+        seq = next(seq_counter)
+        binding.unconfirmed_seqs.add(seq)
+        self._seq_index[seq] = (event_id, trigger_id)
+        return seq
+
+    def confirm_persisted(self, seqs: Iterable[int]) -> None:
+        """
+        Record persist confirmations for trigger event seqs bound in this 
group.
+
+        Seqs that are not (or no longer) in ``_seq_index`` — another group's
+        seqs, or bindings that already resolved through timeout / overflow —
+        are ignored.
+        """
+        for seq in seqs:
+            bound = self._seq_index.pop(seq, None)
+            if bound is None:
+                continue
+            event_id, trigger_id = bound
+            entry = self._outstanding.get(event_id)
+            if entry is None:
+                continue
+            binding = entry.bindings.get(trigger_id)
+            if binding is None:
+                continue
+            binding.unconfirmed_seqs.discard(seq)
+            self._maybe_complete(event_id=event_id, trigger_id=trigger_id)
+
+    def _fail_subscriber_in_outstanding(self, trigger_id: int) -> None:
+        """
+        Resolve ``trigger_id`` as failed in every outstanding entry.
+
+        A force-failed subscriber (queue overflow or ack timeout) will never
+        resolve the events it still owes; leaving it pending in older entries
+        would head-block the ordered advance pump until each entry's own ack
+        timeout. This is the retroactive counterpart of excluding the
+        subscriber from future broadcasts via ``_failed_subscribers``.
+        """
+        for event_id in list(self._outstanding):
+            self._resolve_subscriber(event_id=event_id, trigger_id=trigger_id, 
resolution="failed")
+
+    async def _run_ack_timeout_loop(self) -> None:
+        """
+        Background task: force-fail subscribers whose resolution is overdue.
+
+        Cadence is ``max(0.01, ack_timeout / 10)`` — runs ten times per timeout
+        window, floored at 10 ms to avoid burning CPU when ``ack_timeout`` is 
small.
+        """
+        cadence = max(0.01, self._ack_timeout / 10)
+        while True:
+            await asyncio.sleep(cadence)
+            now = self._now()
+            for event_id, entry in list(self._outstanding.items()):
+                if now - entry.created_at < self._ack_timeout:
+                    continue
+                timed_out = set(entry.pending)
+                for trigger_id in timed_out:
+                    queue = self._subscribers.get(trigger_id)
+                    if queue is not None:
+                        self.log.warning(
+                            "Ack timeout; force-failing subscriber",
+                            key=self.key,
+                            trigger_id=trigger_id,
+                            event_id=event_id,
+                        )
+                        self._drain_and_offer_failure(
+                            queue,
+                            _PollFailure(
+                                AckTimeout(
+                                    f"shared stream {self.key!r} trigger 
{trigger_id} "
+                                    f"did not finish processing event 
{event_id} within "
+                                    f"{self._ack_timeout}s (still on the 
event, or its "
+                                    "trigger events not yet confirmed 
persisted)"
+                                )
+                            ),
+                        )
+                        self._failed_subscribers.add(trigger_id)
+                    # Fail the subscriber out of every outstanding entry (not
+                    # just this one); the resolution wakes the pump, which
+                    # owns entry deletion and the ordered broker advance.
+                    self._fail_subscriber_in_outstanding(trigger_id)
+
+    async def _ack_drain(self, trigger_id: int, queue: asyncio.Queue) -> 
AsyncGenerator[Any, None]:
+        """
+        Ack-mode counterpart of :func:`_drain`, tracking the binding window.
+
+        Unwraps the internal ``(raw_event, event_id)`` queue items and
+        yields the bare raw event, so the subscriber sees the same stream
+        shape as on the fast path. Between the yield and the subscriber
+        pulling the next item, the event's binding window is open: trigger
+        events the subscriber emits bind to it via
+        :meth:`bind_pending_event`. The window closes the moment the
+        subscriber resumes this generator — before we wait on the queue
+        again — so a filtered-out event (nothing yielded) resolves as soon
+        as the filter loops back for the next raw event. Closing the window
+        is the subscriber's acceptance of the event; no explicit call is
+        involved.
+        """
+        while True:
+            item = await queue.get()
+            if isinstance(item, _PollFailure):
+                raise item.exc
+            raw_event, event_id = item
+            self._open_windows[trigger_id] = event_id
+            # Expose the open window to reject_shared_stream_event(), which the
+            # subscriber's filter may call instead of yielding. Clear it in
+            # ``finally`` so cancellation / exceptions also clear it and no
+            # window leaks across raw events. We assign ``None`` rather than
+            # ``reset(token)`` on purpose: each ``__anext__`` of this generator
+            # runs in its caller's context, which may be a different asyncio
+            # task than the one that set the value (the filter is resumed from
+            # a fresh task in some flows), and ``ContextVar.reset`` rejects a
+            # token created in another context. Windows never nest, so clearing
+            # to ``None`` is the correct restore.
+            _reject_context.set(_RejectContext(group=self, 
trigger_id=trigger_id, event_id=event_id))
+            try:
+                yield raw_event
+            finally:
+                _reject_context.set(None)
+            self._close_binding_window(event_id=event_id, 
trigger_id=trigger_id)
 
     def subscribe(self, trigger_id: int) -> AsyncIterator[Any]:
         """Register ``trigger_id`` as a subscriber and return its raw event 
stream."""
         if trigger_id in self._subscribers:
             raise RuntimeError(f"Trigger {trigger_id} already subscribed to 
shared stream {self.key!r}")
         queue: asyncio.Queue = 
asyncio.Queue(maxsize=self._max_subscriber_queue)
         self._subscribers[trigger_id] = queue
+        if self._ack_required:
+            return self._ack_drain(trigger_id, queue)
         return _drain(queue)
 
     def unsubscribe(self, trigger_id: int) -> None:
         # Active subscribers exit through their consuming task being cancelled
         # (Airflow's standard idiom); dropping the queue is enough here.
         self._subscribers.pop(trigger_id, None)
-        self._overflowed.discard(trigger_id)
+        self._failed_subscribers.discard(trigger_id)
+        self._open_windows.pop(trigger_id, None)
+        # Implicit resolution: leaving closes the subscriber's window on
+        # every outstanding event, so the producer never waits forever for a
+        # subscriber that has left. The broker advance still waits for
+        # persist confirmation of any trigger events this subscriber derived
+        # from the event; with nothing unconfirmed the subscriber resolves
+        # immediately.
+        for event_id, entry in list(self._outstanding.items()):
+            binding = entry.bindings.get(trigger_id)
+            if binding is None:
+                # No live binding (already resolved, or pre-ack-mode entry);
+                # _resolve_subscriber no-ops unless still pending.
+                self._resolve_subscriber(event_id=event_id, 
trigger_id=trigger_id, resolution="acked")
+                continue
+            binding.window_closed = True
+            self._maybe_complete(event_id=event_id, trigger_id=trigger_id)

Review Comment:
   `unsubscribe` no longer treats every subscriber who leaves as a clean 
acceptance. It now keys off `binding.window_closed` / the open-window event: 
only an event the subscriber actually pulled (currently sitting on it, or 
already moved past it with persist confirmations still draining) resolves as 
accepted; an event still queued that the subscriber never pulled resolves as 
`failed`, so the broker redelivers it instead of committing a message with no 
persisted TriggerEvent behind it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to