kaxil commented on code in PR #45245:
URL: https://github.com/apache/airflow/pull/45245#discussion_r1898592480


##########
task_sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -436,6 +448,36 @@ def run(ti: RuntimeTaskInstance, log: Logger):
         SUPERVISOR_COMMS.send_request(msg=msg, log=log)
 
 
+def _push_xcom_if_needed(result: Any, ti: RuntimeTaskInstance):
+    """Push XCom values when task has ``do_xcom_push`` set to ``True`` and the 
task returns a result."""
+    if ti.task.do_xcom_push:
+        xcom_value = result
+    else:
+        xcom_value = None
+
+    # If the task returns a result, push an XCom containing it.
+    if xcom_value is None:
+        return
+
+    # If the task has multiple outputs, push each output as a separate XCom.
+    if ti.task.multiple_outputs:
+        if not isinstance(xcom_value, Mapping):
+            raise TypeError(
+                f"Returned output was type {type(xcom_value)} expected 
dictionary for multiple_outputs"
+            )
+        for key in xcom_value.keys():
+            if not isinstance(key, str):
+                raise TypeError(
+                    "Returned dictionary keys must be strings when using "
+                    f"multiple_outputs, found {key} ({type(key)}) instead"
+                )
+        for k, v in result.items():
+            ti.xcom_push(k, v)
+
+    # TODO: Use constant for XCom return key & use serialize_value from Task 
SDK
+    ti.xcom_push("return_value", result)

Review Comment:
   
https://github.com/apache/airflow/blob/60cd5ad302d9140650160a89d86288f145118fb1/airflow/models/taskinstance.py#L766-L768



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to