uranusjr commented on code in PR #32351:
URL: https://github.com/apache/airflow/pull/32351#discussion_r1259264021
##########
airflow/utils/task_group.py:
##########
@@ -207,13 +207,23 @@ def __iter__(self):
else:
yield child
- def add(self, task: DAGNode) -> None:
+ def add(self, task: DAGNode) -> DAGNode:
"""Add a task to this TaskGroup.
:meta private:
"""
from airflow.models.abstractoperator import AbstractOperator
-
+ from airflow.models.xcom_arg import PlainXComArg
+
+ if TaskGroupContext.active:
+ if isinstance(task, PlainXComArg):
+ task = task.operator
Review Comment:
I feel this unpacking code should be the responsibility of
`add_to_taskgroup`, it should pass the underlying task in, not the XComArg
wrapper.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]