Lee-W commented on code in PR #66584:
URL: https://github.com/apache/airflow/pull/66584#discussion_r3287324300
##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -1478,12 +1489,39 @@ async def run_trigger(
name = self.triggers[trigger_id]["name"]
self.log.info("trigger %s starting", name)
+
+ # Triggers that opt into a shared underlying I/O stream
+ # (BaseEventTrigger.shared_stream_key returns non-None) consume a
+ # broadcast stream produced by SharedStreamManager and convert it
+ # via filter_shared_stream(). Everything else stays on the original
+ # standalone-run() path.
+ shared_key: Hashable | None = None
+ event_trigger: BaseEventTrigger | None = None
+ if isinstance(trigger, BaseEventTrigger):
+ event_trigger = trigger
+ try:
+ shared_key = event_trigger.shared_stream_key()
Review Comment:
`shared_stream_key()` is now invoked inside the `with
_make_trigger_span(...)` block, after `render_template_fields(context)`, so any
templated attribute is already resolved by the time the key is built.
`shared_key` is still declared before the `with` so the outer `finally:`
cleanup that calls `unsubscribe` can still see it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]