This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v3-0-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 18e17f2f37a60717482e919b901ee727354aef40 Author: Evgenii Prusov <[email protected]> AuthorDate: Tue Jul 22 02:57:25 2025 +0200 Fixed Task group names duplication in Task's task_id for MappedOperator (#53532) Co-authored-by: Wei Lee <[email protected]> Co-authored-by: Evgenii Prusov <[email protected]> (cherry picked from commit 6b618efa91b4ef80c4821537b30f14a49c2badb6) --- task-sdk/src/airflow/sdk/bases/operator.py | 8 ++++++-- .../task_sdk/definitions/test_mappedoperator.py | 22 ++++++++++++++++++++++ 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/task-sdk/src/airflow/sdk/bases/operator.py b/task-sdk/src/airflow/sdk/bases/operator.py index 5de513bda9e..a3d6f58edfd 100644 --- a/task-sdk/src/airflow/sdk/bases/operator.py +++ b/task-sdk/src/airflow/sdk/bases/operator.py @@ -1014,9 +1014,13 @@ class BaseOperator(AbstractOperator, metaclass=BaseOperatorMeta): ): # Note: Metaclass handles passing in the DAG/TaskGroup from active context manager, if any - self.task_id = task_group.child_id(task_id) if task_group else task_id - if not self.__from_mapped and task_group: + # Only apply task_group prefix if this operator was not created from a mapped operator + # Mapped operators already have the prefix applied during their creation + if task_group and not self.__from_mapped: + self.task_id = task_group.child_id(task_id) task_group.add(self) + else: + self.task_id = task_id super().__init__() self.task_group = task_group diff --git a/task-sdk/tests/task_sdk/definitions/test_mappedoperator.py b/task-sdk/tests/task_sdk/definitions/test_mappedoperator.py index f9c075352a0..7a4b43c8104 100644 --- a/task-sdk/tests/task_sdk/definitions/test_mappedoperator.py +++ b/task-sdk/tests/task_sdk/definitions/test_mappedoperator.py @@ -691,3 +691,25 @@ def test_mapped_xcom_push_skipped_tasks(create_runtime_ti, mock_supervisor_comms ), ] ) + + +def test_mapped_operator_in_task_group_no_duplicate_prefix(): + """Test that task_id doesn't get duplicated prefix when unmapping a mapped operator in a task group.""" + from airflow.sdk.definitions.taskgroup import TaskGroup + + with DAG("test-dag"): + with TaskGroup(group_id="tg1") as tg1: + # Create a mapped task within the task group + mapped_task = MockOperator.partial(task_id="mapped_task", arg1="a").expand(arg2=["a", "b", "c"]) + + # Check the mapped operator has correct task_id + assert mapped_task.task_id == "tg1.mapped_task" + assert mapped_task.task_group == tg1 + assert mapped_task.task_group.group_id == "tg1" + + # Simulate what happens during execution - unmap the operator + # unmap expects resolved kwargs + unmapped = mapped_task.unmap({"arg2": "a"}) + + # The unmapped operator should have the same task_id, not a duplicate prefix + assert unmapped.task_id == "tg1.mapped_task", f"Expected 'tg1.mapped_task' but got '{unmapped.task_id}'"
