This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v3-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 18e17f2f37a60717482e919b901ee727354aef40
Author: Evgenii Prusov <[email protected]>
AuthorDate: Tue Jul 22 02:57:25 2025 +0200

    Fixed Task group names duplication in Task's task_id for MappedOperator 
(#53532)
    
    Co-authored-by: Wei Lee <[email protected]>
    Co-authored-by: Evgenii Prusov <[email protected]>
    (cherry picked from commit 6b618efa91b4ef80c4821537b30f14a49c2badb6)
---
 task-sdk/src/airflow/sdk/bases/operator.py         |  8 ++++++--
 .../task_sdk/definitions/test_mappedoperator.py    | 22 ++++++++++++++++++++++
 2 files changed, 28 insertions(+), 2 deletions(-)

diff --git a/task-sdk/src/airflow/sdk/bases/operator.py 
b/task-sdk/src/airflow/sdk/bases/operator.py
index 5de513bda9e..a3d6f58edfd 100644
--- a/task-sdk/src/airflow/sdk/bases/operator.py
+++ b/task-sdk/src/airflow/sdk/bases/operator.py
@@ -1014,9 +1014,13 @@ class BaseOperator(AbstractOperator, 
metaclass=BaseOperatorMeta):
     ):
         # Note: Metaclass handles passing in the DAG/TaskGroup from active 
context manager, if any
 
-        self.task_id = task_group.child_id(task_id) if task_group else task_id
-        if not self.__from_mapped and task_group:
+        # Only apply task_group prefix if this operator was not created from a 
mapped operator
+        # Mapped operators already have the prefix applied during their 
creation
+        if task_group and not self.__from_mapped:
+            self.task_id = task_group.child_id(task_id)
             task_group.add(self)
+        else:
+            self.task_id = task_id
 
         super().__init__()
         self.task_group = task_group
diff --git a/task-sdk/tests/task_sdk/definitions/test_mappedoperator.py 
b/task-sdk/tests/task_sdk/definitions/test_mappedoperator.py
index f9c075352a0..7a4b43c8104 100644
--- a/task-sdk/tests/task_sdk/definitions/test_mappedoperator.py
+++ b/task-sdk/tests/task_sdk/definitions/test_mappedoperator.py
@@ -691,3 +691,25 @@ def test_mapped_xcom_push_skipped_tasks(create_runtime_ti, 
mock_supervisor_comms
             ),
         ]
     )
+
+
+def test_mapped_operator_in_task_group_no_duplicate_prefix():
+    """Test that task_id doesn't get duplicated prefix when unmapping a mapped 
operator in a task group."""
+    from airflow.sdk.definitions.taskgroup import TaskGroup
+
+    with DAG("test-dag"):
+        with TaskGroup(group_id="tg1") as tg1:
+            # Create a mapped task within the task group
+            mapped_task = MockOperator.partial(task_id="mapped_task", 
arg1="a").expand(arg2=["a", "b", "c"])
+
+    # Check the mapped operator has correct task_id
+    assert mapped_task.task_id == "tg1.mapped_task"
+    assert mapped_task.task_group == tg1
+    assert mapped_task.task_group.group_id == "tg1"
+
+    # Simulate what happens during execution - unmap the operator
+    # unmap expects resolved kwargs
+    unmapped = mapped_task.unmap({"arg2": "a"})
+
+    # The unmapped operator should have the same task_id, not a duplicate 
prefix
+    assert unmapped.task_id == "tg1.mapped_task", f"Expected 'tg1.mapped_task' 
but got '{unmapped.task_id}'"

Reply via email to