Lee-W commented on code in PR #67523:
URL: https://github.com/apache/airflow/pull/67523#discussion_r3407479814
##########
airflow-core/src/airflow/triggers/shared_stream.py:
##########
@@ -220,20 +674,264 @@ async def _poll(self) -> None:
# Drain stale events then put the failure sentinel so every
# subscriber wakes up even if its queue was at capacity.
self._drain_and_offer_failure(queue, failure)
+ # End of synchronous section; yields are safe from here on.
+ if cancelled_ack_task is not None:
+ with suppress(asyncio.CancelledError):
+ await cancelled_ack_task
+ if self._pump_task is not None:
+ # The pump exits by itself once it has dispatched everything
+ # already resolved and dispatchable; the suppress is defensive.
+ with suppress(asyncio.CancelledError):
+ await self._pump_task
+ if producer is not None:
+ try:
+ await producer.aclose()
+ except Exception as exc:
+ self.log.warning("Producer aclose failed", key=self.key,
exc_info=exc)
+
+ def _resolve_subscriber(
+ self,
+ *,
+ event_id: int,
+ trigger_id: int,
+ resolution: Literal["acked", "nacked", "failed"],
+ ) -> None:
+ """
+ Record one subscriber's resolution of one event.
+
+ Removes ``trigger_id`` from the pending set of ``event_id`` and adds
+ it to the matching outcome count. If the set empties, wakes the
+ advance pump — deleting the entry and dispatching the broker advance
+ are the pump's job, so advances stay in per-lane fan-out order.
Duplicate
+ calls for the same (event_id, trigger_id) are no-ops, which keeps
+ every subscriber counted exactly once per event.
+
+ This is the single cleanup point for the subscriber's ack
+ bookkeeping: the binding is popped and its unconfirmed seqs leave
+ ``_seq_index``, so a persist confirmation arriving after the fact
+ (e.g. after an ack timeout already failed the subscriber) finds
+ nothing and no-ops.
+ """
+ entry = self._outstanding.get(event_id)
+ if entry is None or trigger_id not in entry.pending:
+ return # already resolved, already advanced, or never existed
+ entry.pending.discard(trigger_id)
+ binding = entry.bindings.pop(trigger_id, None)
+ if binding is not None:
+ for seq in binding.unconfirmed_seqs:
+ self._seq_index.pop(seq, None)
+ if resolution == "acked":
+ entry.acked += 1
+ elif resolution == "nacked":
+ entry.nacked += 1
+ else:
+ entry.failed += 1
+ if not entry.pending:
+ self._advance_wakeup.set()
+
+ def _record_ack(self, *, event_id: int, trigger_id: int) -> None:
+ """
+ Record that the subscriber called ``ack()`` for one event.
+
+ The subscriber resolves as acked only once the binding window is
+ closed and every bound seq is confirmed persisted; until then the
+ event stays outstanding (the ack timeout is the backstop).
+ """
+ entry = self._outstanding.get(event_id)
+ if entry is None:
+ return
+ binding = entry.bindings.get(trigger_id)
+ if binding is None:
+ return # already resolved (e.g. force-failed before the ack
landed)
+ binding.acked = True
+ self._maybe_complete(event_id=event_id, trigger_id=trigger_id)
+
+ def _record_nack(self, *, event_id: int, trigger_id: int) -> None:
+ """Resolve the subscriber as nacked immediately; no persistence gating
applies."""
+ self._resolve_subscriber(event_id=event_id, trigger_id=trigger_id,
resolution="nacked")
+
+ def _maybe_complete(self, *, event_id: int, trigger_id: int) -> None:
+ """Resolve the subscriber as acked once ack, window close, and all
persist confirmations are in."""
+ entry = self._outstanding.get(event_id)
+ if entry is None:
+ return
+ binding = entry.bindings.get(trigger_id)
+ if binding is None:
+ return
+ if binding.acked and binding.window_closed and not
binding.unconfirmed_seqs:
+ self._resolve_subscriber(event_id=event_id, trigger_id=trigger_id,
resolution="acked")
+
+ def _close_binding_window(self, token: AckToken) -> None:
+ """
+ Close the binding window of one yielded token.
+
+ Called by ``_ack_drain`` the moment the subscriber pulls the next
+ item — trigger events the subscriber emits from here on belong to
+ the next token, and an ack recorded for this one can now complete
+ (subject to persist confirmations).
+ """
+ trigger_id = token._trigger_id
+ if self._current_token.get(trigger_id) is token:
+ del self._current_token[trigger_id]
+ entry = self._outstanding.get(token._event_id)
+ if entry is None:
+ return
+ binding = entry.bindings.get(trigger_id)
+ if binding is None:
+ return
+ binding.window_closed = True
+ self._maybe_complete(event_id=token._event_id, trigger_id=trigger_id)
+
+ def bind_pending_event(self, *, trigger_id: int, seq_counter:
Iterator[int]) -> int | None:
+ """
+ Bind one just-emitted trigger event to the subscriber's open binding
window.
+
+ Returns the seq the broker advance must wait on, or ``None`` when
+ there is nothing to bind to — no window open (not in ack mode, or
+ the event was emitted outside any raw event's window) or the
+ binding already resolved (nacked or force-failed). A seq is only
+ drawn from ``seq_counter`` when the event actually binds.
+ """
+ token = self._current_token.get(trigger_id)
+ if token is None:
+ return None
+ entry = self._outstanding.get(token._event_id)
+ if entry is None:
+ return None
+ binding = entry.bindings.get(trigger_id)
+ if binding is None:
+ return None
+ seq = next(seq_counter)
+ binding.unconfirmed_seqs.add(seq)
+ self._seq_index[seq] = (token._event_id, trigger_id)
+ return seq
+
+ def confirm_persisted(self, seqs: Iterable[int]) -> None:
+ """
+ Record persist confirmations for trigger event seqs bound in this
group.
+
+ Seqs that are not (or no longer) in ``_seq_index`` — another group's
+ seqs, or bindings that already resolved through nack / timeout —
+ are ignored.
+ """
+ for seq in seqs:
+ bound = self._seq_index.pop(seq, None)
+ if bound is None:
+ continue
+ event_id, trigger_id = bound
+ entry = self._outstanding.get(event_id)
+ if entry is None:
+ continue
+ binding = entry.bindings.get(trigger_id)
+ if binding is None:
+ continue
Review Comment:
yep, I thought it would make the statement too long, but turn out to be a
good idea 🤔 will push it. Thanks!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]