phanikumv commented on PR #66584:
URL: https://github.com/apache/airflow/pull/66584#issuecomment-4448885109
On `_poll` in `airflow-core/src/airflow/triggers/shared_stream.py` (the
`except Exception` fan-out block) —
When `open_shared_stream` raises, `_poll` fans out the `_PollFailure`
sentinel and returns, but the group itself stays in `manager._groups` until
*every* existing subscriber consumes the failure, propagates it up through
`run_trigger`, and calls `unsubscribe`. That window is small but nonzero, and a
new subscriber for the same key arriving during it joins a group whose poll
task is already dead:
```python
# SharedStreamManager.subscribe
group = self._groups.get(key)
if group is None: # only check is "does a group exist?"
...
group.start()
return group.subscribe(trigger_id) # otherwise, attach to the existing
(possibly dead) group
```
The late subscriber gets a fresh `asyncio.Queue` registered in
`_subscribers`, but nothing will ever put items on it — `_poll` already
returned. Its `_drain` blocks on `queue.get()` forever, and because the group
is no longer empty, the existing subscribers' eventual `unsubscribe` calls
won't drop the group either. Net effect: one trigger silently hangs until
external cancellation, with no log line and no recovery.
The window opens any time a new trigger for the same shared-stream key is
admitted between the broadcast of `_PollFailure` and the last existing
subscriber's `unsubscribe` — realistic under high trigger churn on a hot key.
---
Drafted-by: Claude Code (Opus 4.7)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]