Lee-W commented on code in PR #67523:
URL: https://github.com/apache/airflow/pull/67523#discussion_r3409076836


##########
airflow-core/src/airflow/triggers/shared_stream.py:
##########
@@ -163,17 +445,45 @@ def __init__(
         kwargs: dict[str, Any],
         on_poll_terminate: Callable[[_SharedStreamGroup], None],
         max_subscriber_queue: int,
+        ack_timeout: float,
         log: BoundLogger,
+        _now: Callable[[], float] = time.monotonic,
     ) -> None:
         self.key = key
         self.trigger_class = trigger_class
         self.kwargs = kwargs
         self.log = log
         self._on_poll_terminate = on_poll_terminate
         self._max_subscriber_queue = max_subscriber_queue
+        self._ack_timeout = ack_timeout
+        self._now = _now
         self._subscribers: dict[int, asyncio.Queue] = {}
-        self._overflowed: set[int] = set()
+        # Subscribers already force-failed (queue overflow or ack timeout);
+        # excluded from subsequent broadcasts until they unsubscribe.
+        self._failed_subscribers: set[int] = set()
         self._poll_task: asyncio.Task | None = None
+        # Constant for the group's lifetime: trigger_class never changes.
+        self._ack_required: bool = self._is_ack_required()
+        # Ack mode state — populated only when create_shared_stream_producer
+        # is overridden.
+        self._outstanding: dict[int, _OutstandingEntry] = {}
+        # Per-lane FIFO index over _outstanding: event ids in fan-out order,
+        # keyed by the lane each event's producer assigned at fan-out.
+        self._lane_queues: dict[Hashable, deque[int]] = {}
+        # seq -> (event_id, trigger_id) for trigger events awaiting persist
+        # confirmation; entries are removed on confirmation or when the
+        # owning binding resolves (late confirmations then no-op).
+        self._seq_index: dict[int, tuple[int, int]] = {}
+        # Per subscriber: the token whose binding window is currently open —
+        # trigger events the subscriber emits now bind to this token.
+        self._current_token: dict[int, AckToken] = {}
+        self._next_event_id: int = 0
+        self._ack_timeout_task: asyncio.Task | None = None
+        # Advance pump: a single task dispatches broker advances in per-lane
+        # fan-out order, woken through this event whenever an entry resolves.
+        self._advance_wakeup: asyncio.Event = asyncio.Event()
+        self._pump_stopping: bool = False
+        self._pump_task: asyncio.Task | None = None

Review Comment:
   yep, it's now updated as part of the ack/nack refactoring



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to