Lee-W commented on code in PR #67523:
URL: https://github.com/apache/airflow/pull/67523#discussion_r3409076836
##########
airflow-core/src/airflow/triggers/shared_stream.py:
##########
@@ -163,17 +445,45 @@ def __init__(
kwargs: dict[str, Any],
on_poll_terminate: Callable[[_SharedStreamGroup], None],
max_subscriber_queue: int,
+ ack_timeout: float,
log: BoundLogger,
+ _now: Callable[[], float] = time.monotonic,
) -> None:
self.key = key
self.trigger_class = trigger_class
self.kwargs = kwargs
self.log = log
self._on_poll_terminate = on_poll_terminate
self._max_subscriber_queue = max_subscriber_queue
+ self._ack_timeout = ack_timeout
+ self._now = _now
self._subscribers: dict[int, asyncio.Queue] = {}
- self._overflowed: set[int] = set()
+ # Subscribers already force-failed (queue overflow or ack timeout);
+ # excluded from subsequent broadcasts until they unsubscribe.
+ self._failed_subscribers: set[int] = set()
self._poll_task: asyncio.Task | None = None
+ # Constant for the group's lifetime: trigger_class never changes.
+ self._ack_required: bool = self._is_ack_required()
+ # Ack mode state — populated only when create_shared_stream_producer
+ # is overridden.
+ self._outstanding: dict[int, _OutstandingEntry] = {}
+ # Per-lane FIFO index over _outstanding: event ids in fan-out order,
+ # keyed by the lane each event's producer assigned at fan-out.
+ self._lane_queues: dict[Hashable, deque[int]] = {}
+ # seq -> (event_id, trigger_id) for trigger events awaiting persist
+ # confirmation; entries are removed on confirmation or when the
+ # owning binding resolves (late confirmations then no-op).
+ self._seq_index: dict[int, tuple[int, int]] = {}
+ # Per subscriber: the token whose binding window is currently open —
+ # trigger events the subscriber emits now bind to this token.
+ self._current_token: dict[int, AckToken] = {}
+ self._next_event_id: int = 0
+ self._ack_timeout_task: asyncio.Task | None = None
+ # Advance pump: a single task dispatches broker advances in per-lane
+ # fan-out order, woken through this event whenever an entry resolves.
+ self._advance_wakeup: asyncio.Event = asyncio.Event()
+ self._pump_stopping: bool = False
+ self._pump_task: asyncio.Task | None = None
Review Comment:
yep, it's now updated as part of the ack/nack refactoring
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]