phanikumv commented on code in PR #66584:
URL: https://github.com/apache/airflow/pull/66584#discussion_r3239998887


##########
airflow-core/src/airflow/triggers/shared_stream.py:
##########
@@ -0,0 +1,212 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Shared underlying I/O between :class:`BaseEventTrigger` instances in the 
triggerer.
+
+When multiple triggers declare the same non-``None``
+:meth:`~airflow.triggers.base.BaseEventTrigger.shared_stream_key`, the
+triggerer routes them through :class:`SharedStreamManager` so that one
+underlying poll loop produces raw events that are broadcast to every
+participating trigger. Each trigger then runs
+:meth:`~airflow.triggers.base.BaseEventTrigger.filter_shared_stream` to
+convert the broadcast into its own :class:`~airflow.triggers.base.TriggerEvent`
+instances. Triggers that opt out (the default) keep their independent
+``run()``-based poll loops untouched.
+"""
+
+from __future__ import annotations
+
+import asyncio
+from collections.abc import AsyncIterator, Hashable
+from contextlib import suppress
+from typing import TYPE_CHECKING, Any
+
+import structlog
+
+if TYPE_CHECKING:
+    from structlog.stdlib import BoundLogger
+
+    from airflow.triggers.base import BaseEventTrigger
+
+log = structlog.get_logger(__name__)
+
+
+class _PollFailure:
+    """Sentinel propagated through subscriber queues when the shared poll 
fails."""
+
+    __slots__ = ("exc",)
+
+    def __init__(self, exc: BaseException) -> None:
+        self.exc = exc
+
+
+async def _drain(queue: asyncio.Queue) -> AsyncIterator[Any]:
+    """
+    Yield items from ``queue`` until a poll failure arrives.
+
+    Subscribers exit either by their consuming task being cancelled (Airflow's
+    standard idiom — :class:`CancelledError` propagates through
+    ``queue.get()``) or by ``open_shared_stream`` raising, in which case a
+    :class:`_PollFailure` sentinel re-raises the exception here.
+    """
+    while True:
+        item = await queue.get()
+        if isinstance(item, _PollFailure):
+            raise item.exc
+        yield item
+
+
+class _SharedStreamGroup:
+    """One shared poll loop, broadcasting raw events to N subscriber queues."""
+
+    def __init__(
+        self,
+        *,
+        key: Hashable,
+        trigger_class: type[BaseEventTrigger],
+        kwargs: dict[str, Any],
+        log: BoundLogger,
+    ) -> None:
+        self.key = key
+        self.trigger_class = trigger_class
+        self.kwargs = kwargs
+        self.log = log
+        self._subscribers: dict[int, asyncio.Queue] = {}
+        self._poll_task: asyncio.Task | None = None
+
+    def start(self) -> None:
+        """Start the underlying poll loop. Call exactly once per group."""
+        if self._poll_task is not None:
+            raise RuntimeError(f"Shared stream group {self.key!r} already 
started")
+        self._poll_task = asyncio.create_task(
+            self._poll(),
+            name=f"shared-stream-poll[{self.key!r}]",
+        )
+
+    async def _poll(self) -> None:
+        try:
+            async for raw_event in 
self.trigger_class.open_shared_stream(self.kwargs):
+                if not self._subscribers:
+                    # No one to deliver to; the manager will tear us down 
shortly,
+                    # but we may still be racing with unsubscribe(). Drop the 
event
+                    # rather than buffering forever.
+                    continue
+                for queue in self._subscribers.values():
+                    queue.put_nowait(raw_event)
+        except asyncio.CancelledError:
+            raise
+        except Exception as exc:
+            self.log.exception("Shared stream poll failed; propagating to 
subscribers", key=self.key)
+            failure = _PollFailure(exc)
+            for queue in self._subscribers.values():
+                queue.put_nowait(failure)

Review Comment:
   When `open_shared_stream` raises, `_poll` fans out the `_PollFailure` 
sentinel and returns, but the group itself stays in `manager._groups` until 
*every* existing subscriber consumes the failure, propagates it up through 
`run_trigger`, and calls `unsubscribe`. That window is small but nonzero, and a 
new subscriber for the same key arriving during it joins a group whose poll 
task is already dead:
   
   ```python
   # SharedStreamManager.subscribe
   group = self._groups.get(key)
   if group is None:                      # only check is "does a group exist?"
       ...
       group.start()
   return group.subscribe(trigger_id)     # otherwise, attach to the existing 
(possibly dead) group
   ```
   
   The late subscriber gets a fresh `asyncio.Queue` registered in 
`_subscribers`, but nothing will ever put items on it — `_poll` already 
returned. Its `_drain` blocks on `queue.get()` forever, and because the group 
is no longer empty, the existing subscribers' eventual `unsubscribe` calls 
won't drop the group either. Net effect: one trigger silently hangs until 
external cancellation, with no log line and no recovery.
   
   The window opens any time a new trigger for the same shared-stream key is 
admitted between the broadcast of `_PollFailure` and the last existing 
subscriber's `unsubscribe` — realistic under high trigger churn on a hot key.
   
   ---
   Drafted-by: Claude Code (Opus 4.7)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to