Copilot commented on code in PR #64068:
URL: https://github.com/apache/airflow/pull/64068#discussion_r3025338559


##########
airflow-core/src/airflow/models/trigger.py:
##########
@@ -273,6 +273,12 @@ def submit_event(cls, trigger_id, event: TriggerEvent, 
session: Session = NEW_SE
             handle_event_submit(event, task_instance=task_instance, 
session=session)
 
         # Send an event to assets
+        if event.xcoms:
+            log.warning(
+                "Trigger event %i contains XCom values, which cannot be sent 
to assets or callbacks. XCom values: %s",
+                trigger_id,
+                event.xcoms,

Review Comment:
   The warning logs the full `event.xcoms` payload. Since XComs can contain 
sensitive or large data, this can leak secrets into logs and create noisy log 
volume. Consider logging only the presence/keys (or a count), and only emitting 
this warning when the trigger actually has asset/callback associations to 
notify (otherwise this will warn even for task-only triggers).



##########
airflow-core/src/airflow/models/trigger.py:
##########
@@ -486,6 +495,11 @@ def handle_event_submit(event: TriggerEvent, *, 
task_instance: TaskInstance, ses
     # re-serialize the entire dict using serde to ensure consistent structure
     task_instance.next_kwargs = serialize(next_kwargs)
 
+    # Push XCom values if provided by the trigger
+    if event.xcoms:
+        for key, value in event.xcoms.items():
+            task_instance.xcom_push(key=key, value=value, session=session)
+

Review Comment:
   Pushing XComs one-by-one can be expensive: `task_instance.xcom_push()` calls 
`XComModel.set()` which performs a `DagRun.id` lookup plus delete/insert for 
each key. If `event.xcoms` can contain multiple keys, consider batching this 
(e.g., resolve `dag_run_id` once and insert multiple rows in one operation) to 
avoid repeated queries and write amplification.



##########
airflow-core/docs/authoring-and-scheduling/deferring.rst:
##########
@@ -445,12 +445,14 @@ Triggers can have two options: they can either send 
execution back to the worker
         async def run(self) -> AsyncIterator[TriggerEvent]:
             await asyncio.sleep(self.duration.total_seconds())
             if self.end_from_trigger:
-                yield TaskSuccessEvent()
+                yield TaskSuccessEvent(xcoms={"wait_duration_s": 
self.duration.total_seconds()})
             else:
                 yield TriggerEvent({"duration": self.duration})
 
 In the above example, the trigger will end the task instance directly if 
``end_from_trigger`` is set to ``True`` by yielding ``TaskSuccessEvent``. 
Otherwise, it will resume the task instance with the method specified in the 
operator.
 
+Note also that in case of direct exit, an XCom can be produced and passed with 
the ``TaskSuccessEvent`` or ``TaskFailureEvent``. This XCom will be pushed when 
the task instance is marked as success or failure.

Review Comment:
   The docs refer to `TaskFailureEvent`, but the actual event class is 
`TaskFailedEvent` (see `airflow/triggers/base.py`). This mismatch can confuse 
users copying the example; please update the name here (and elsewhere in this 
section if applicable).
   ```suggestion
   Note also that in case of direct exit, an XCom can be produced and passed 
with the ``TaskSuccessEvent`` or ``TaskFailedEvent``. This XCom will be pushed 
when the task instance is marked as success or failure.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to