phanikumv commented on PR #66584:
URL: https://github.com/apache/airflow/pull/66584#issuecomment-4448885109

   On `_poll` in `airflow-core/src/airflow/triggers/shared_stream.py` (the 
`except Exception` fan-out block) —
   
   When `open_shared_stream` raises, `_poll` fans out the `_PollFailure` 
sentinel and returns, but the group itself stays in `manager._groups` until 
*every* existing subscriber consumes the failure, propagates it up through 
`run_trigger`, and calls `unsubscribe`. That window is small but nonzero, and a 
new subscriber for the same key arriving during it joins a group whose poll 
task is already dead:
   
   ```python
   # SharedStreamManager.subscribe
   group = self._groups.get(key)
   if group is None:                      # only check is "does a group exist?"
       ...
       group.start()
   return group.subscribe(trigger_id)     # otherwise, attach to the existing 
(possibly dead) group
   ```
   
   The late subscriber gets a fresh `asyncio.Queue` registered in 
`_subscribers`, but nothing will ever put items on it — `_poll` already 
returned. Its `_drain` blocks on `queue.get()` forever, and because the group 
is no longer empty, the existing subscribers' eventual `unsubscribe` calls 
won't drop the group either. Net effect: one trigger silently hangs until 
external cancellation, with no log line and no recovery.
   
   The window opens any time a new trigger for the same shared-stream key is 
admitted between the broadcast of `_PollFailure` and the last existing 
subscriber's `unsubscribe` — realistic under high trigger churn on a hot key.
   
   ---
   Drafted-by: Claude Code (Opus 4.7)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to