kaxil commented on code in PR #66584:
URL: https://github.com/apache/airflow/pull/66584#discussion_r3268774682


##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -1478,12 +1489,39 @@ async def run_trigger(
 
         name = self.triggers[trigger_id]["name"]
         self.log.info("trigger %s starting", name)
+
+        # Triggers that opt into a shared underlying I/O stream
+        # (BaseEventTrigger.shared_stream_key returns non-None) consume a
+        # broadcast stream produced by SharedStreamManager and convert it
+        # via filter_shared_stream(). Everything else stays on the original
+        # standalone-run() path.
+        shared_key: Hashable | None = None
+        event_trigger: BaseEventTrigger | None = None
+        if isinstance(trigger, BaseEventTrigger):
+            event_trigger = trigger
+            try:
+                shared_key = event_trigger.shared_stream_key()

Review Comment:
   `shared_stream_key()` is called here, before `render_template_fields()` runs 
at line 1514. For AssetWatcher triggers that's fine because `context is None`, 
so templates never render anyway. But if a deferred trigger ever opts into 
shared streams, fields used inside `shared_stream_key` (e.g. `self.directory`) 
will still hold unrendered template strings, and sibling triggers that render 
to the same path won't share their poll. Worth either calling 
`shared_stream_key` after `render_template_fields`, or documenting this 
constraint explicitly in `shared_stream_key`'s docstring so subclass authors 
don't accidentally key on templated attributes.



##########
airflow-core/src/airflow/triggers/shared_stream.py:
##########
@@ -0,0 +1,377 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Shared underlying I/O between :class:`BaseEventTrigger` instances in the 
triggerer.
+
+When multiple triggers declare the same non-``None``
+:meth:`~airflow.triggers.base.BaseEventTrigger.shared_stream_key`, the
+triggerer routes them through :class:`SharedStreamManager` so that one
+underlying poll loop produces raw events that are broadcast to every
+participating trigger. Each trigger then runs
+:meth:`~airflow.triggers.base.BaseEventTrigger.filter_shared_stream` to
+convert the broadcast into its own :class:`~airflow.triggers.base.TriggerEvent`
+instances. Triggers that opt out (the default) keep their independent
+``run()``-based poll loops untouched.
+
+Scope and the missing ack channel
+---------------------------------
+
+The shared-stream channel is **one-way**: events flow from
+``open_shared_stream`` out to each subscriber's ``filter_shared_stream``,
+with no path back. Subscribers cannot tell the producer "I accepted this
+event; please advance / commit / ack". The pattern is therefore only safe
+for upstreams whose consumption does not need a producer-side side effect
+tied to a subscriber's accept / reject decision:
+
+* Idempotent / read-only reads (filesystem listings, polling REST APIs).
+* Auto-commit Kafka consumers (``enable.auto.commit=true``).
+* Subscriber-side-effect cleanup (``unlink``, local marking, …) where the
+  per-event action goes through APIs the subscriber owns independently.
+
+Kafka manual-commit consumers, SQS delete-on-process / visibility
+extension, and similar message-broker patterns where progress is per-message
+and tied to the subscriber's decision are **not** in scope here today. A
+producer-side ack channel to cover them is a follow-up that should be
+designed against a concrete Kafka or SQS consumer rather than against an
+abstract API. See :class:`~airflow.triggers.base.BaseEventTrigger` for the
+matching subclass-facing notes.
+
+Lifecycle invariants
+--------------------
+
+The manager and groups cooperate to keep a single invariant true at every
+``await``-point:
+
+    A key is present in :attr:`SharedStreamManager._groups` only while its
+    group's poll task is alive and accepting new subscribers.
+
+This rules out the late-subscriber races that the naive design admits — a
+new subscriber for a key whose poll has died or is in the middle of being
+torn down always falls through to "create a fresh group" rather than
+attaching to a dead one and hanging on an empty queue. The invariant is
+maintained synchronously:
+
+* When ``_poll`` ends for any reason other than cancellation (the upstream
+  iterator raised, or returned), the group's ``finally`` block evicts the
+  key from ``_groups`` and broadcasts a terminal sentinel to current
+  subscribers — all without yielding, so no other coroutine can interleave.
+* When the last subscriber leaves, :meth:`SharedStreamManager.unsubscribe`
+  evicts the key from ``_groups`` *before* awaiting ``group.stop()``, so a
+  new subscriber arriving while we wait for cancellation creates a fresh
+  group.
+* :meth:`SharedStreamManager.stop_all` clears ``_groups`` in one synchronous
+  step before awaiting any stop, applying the same rule to shutdown.
+"""
+
+from __future__ import annotations
+
+import asyncio
+from collections.abc import AsyncGenerator, AsyncIterator, Callable, Hashable
+from contextlib import suppress
+from typing import TYPE_CHECKING, Any
+
+import structlog
+
+if TYPE_CHECKING:
+    from structlog.stdlib import BoundLogger
+
+    from airflow.triggers.base import BaseEventTrigger
+
+log = structlog.get_logger(__name__)
+
+DEFAULT_SUBSCRIBER_QUEUE_MAX = 1024
+"""Default per-subscriber queue size for shared streams.
+
+The :class:`SharedStreamManager` admits up to this many unconsumed raw events
+per subscriber before treating the subscriber as too slow to keep up — at
+which point the subscriber's trigger is failed with
+:class:`_SubscriberOverflow` rather than the queue growing without bound.
+
+Used as the fallback when no value is passed to ``SharedStreamManager``;
+in the triggerer this is overridden from the
+``[triggerer] shared_stream_subscriber_queue_size`` config option.
+"""
+
+
+class _PollTerminated(Exception):
+    """
+    Raised inside subscribers when ``open_shared_stream`` returns without 
yielding more events.
+
+    Implementations are expected to run for the lifetime of the group; an
+    early return would otherwise leave subscribers waiting forever on an
+    empty queue.
+    """
+
+
+class _SubscriberOverflow(Exception):
+    """
+    Raised in a subscriber whose queue exceeded its maxsize.
+
+    Surfaces the slow subscriber loudly through the standard trigger-failure
+    path (rather than silently dropping events) so Airflow's retry / failure
+    semantics apply. Other subscribers in the same group are unaffected.
+    """
+
+
+class _PollFailure:
+    """Sentinel propagated through subscriber queues when the shared poll 
ends."""
+
+    __slots__ = ("exc",)
+
+    def __init__(self, exc: BaseException) -> None:
+        self.exc = exc
+
+
+async def _drain(queue: asyncio.Queue) -> AsyncGenerator[Any, None]:
+    """
+    Yield items from ``queue`` until a poll termination sentinel arrives.
+
+    Subscribers exit either by their consuming task being cancelled
+    (Airflow's standard idiom — :class:`CancelledError` propagates through
+    ``queue.get()``) or by the shared poll ending, in which case the
+    :class:`_PollFailure` sentinel re-raises here.
+    """
+    while True:
+        item = await queue.get()
+        if isinstance(item, _PollFailure):
+            raise item.exc
+        yield item
+
+
+class _SharedStreamGroup:
+    """One shared poll loop broadcasting raw events to N subscriber queues."""
+
+    def __init__(
+        self,
+        *,
+        key: Hashable,
+        trigger_class: type[BaseEventTrigger],
+        kwargs: dict[str, Any],
+        on_poll_terminate: Callable[[_SharedStreamGroup], None],
+        max_subscriber_queue: int,
+        log: BoundLogger,
+    ) -> None:
+        self.key = key
+        self.trigger_class = trigger_class
+        self.kwargs = kwargs
+        self.log = log
+        self._on_poll_terminate = on_poll_terminate
+        self._max_subscriber_queue = max_subscriber_queue
+        self._subscribers: dict[int, asyncio.Queue] = {}
+        self._overflowed: set[int] = set()
+        self._poll_task: asyncio.Task | None = None
+
+    def start(self) -> None:
+        """Start the underlying poll loop. Call exactly once per group."""
+        if self._poll_task is not None:
+            raise RuntimeError(f"Shared stream group {self.key!r} already 
started")
+        self._poll_task = asyncio.create_task(
+            self._poll(),
+            name=f"shared-stream-poll[{self.key!r}]",
+        )
+
+    async def _poll(self) -> None:
+        terminal_exc: BaseException | None = None
+        try:
+            async for raw_event in 
self.trigger_class.open_shared_stream(self.kwargs):
+                for trigger_id, queue in self._subscribers.items():
+                    if trigger_id in self._overflowed:
+                        # Subscriber has been force-failed on a previous
+                        # overflow; the failure sentinel is already in its
+                        # queue and unsubscribe will drop it on next pass.
+                        continue
+                    try:
+                        queue.put_nowait(raw_event)
+                    except asyncio.QueueFull:
+                        self._fail_overflowed_subscriber(trigger_id, queue)
+            terminal_exc = _PollTerminated(
+                f"open_shared_stream for {self.key!r} returned without 
raising; "
+                "shared streams are expected to run for the lifetime of the 
group"
+            )
+        except asyncio.CancelledError:
+            # ``stop()`` initiated this; the manager has already evicted the
+            # group and is awaiting our exit. Do not run the terminate path.
+            raise
+        except Exception as exc:
+            terminal_exc = exc
+            self.log.exception("Shared stream poll failed; propagating to 
subscribers", key=self.key)
+        finally:
+            if terminal_exc is not None:
+                # Synchronous: evict from the manager and broadcast the
+                # sentinel before returning to the loop, so no coroutine can
+                # observe ``_groups[key]`` pointing at a dead poll.
+                self._on_poll_terminate(self)
+                failure = _PollFailure(terminal_exc)
+                for queue in self._subscribers.values():
+                    queue.put_nowait(failure)

Review Comment:
   The terminal broadcast at this `put_nowait` is unguarded, unlike the inline 
broadcast at line 198 which catches `QueueFull` via 
`_fail_overflowed_subscriber`. A subscriber whose queue is already at capacity 
when the poll terminates (e.g. a `_PollFailure(_SubscriberOverflow)` sitting 
unread in a `maxsize=1` queue, or simply a slow consumer that hadn't hit 
overflow yet) will raise `QueueFull` here, aborting the loop and leaving later 
subscribers without the terminal sentinel -- they'll hang on `queue.get()` 
forever. Wrapping this in `with suppress(asyncio.QueueFull):` (or draining + 
replacing as `_fail_overflowed_subscriber` does) keeps the broadcast complete.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to