lkindere opened a new issue, #64526: URL: https://github.com/apache/airflow/issues/64526
### Apache Airflow version 3.1.6 ### What happened and how to reproduce it? When clearing a single failed task instance in a mapped task group with downstream option, it will clear all instances of the downstream task. For example if we have a mapped task group of 3 tasks: Task1_A, Task1_B, Task1_C >> Task2_A, Task2_B, Task2_C Clearing Task1_B in the UI with downstream selected would clear Task1_B and Task2_A, Task2_B, Task2_C. Excepted behaviour: Clearing Task1_B and Task1_C <img width="1745" height="572" alt="Image" src="https://github.com/user-attachments/assets/58a8d8a3-c756-4b16-a395-87fd9c2c85b8" /> Trying to clear only in index 0: <img width="1745" height="572" alt="Image" src="https://github.com/user-attachments/assets/b3f752c0-dda5-4262-bff4-cc518b0e8dcc" /> We see all 3 instances of the second task get selected: <img width="1745" height="572" alt="Image" src="https://github.com/user-attachments/assets/f6289834-baeb-4eed-b26d-a64b92af5013" /> However if we clear the dag run itself for failed tasks for example: <img width="1745" height="777" alt="Image" src="https://github.com/user-attachments/assets/4e1aa7ec-abb4-4ab1-9df9-990e1b0dfd42" /> <img width="1745" height="777" alt="Image" src="https://github.com/user-attachments/assets/af251876-f55f-4a2f-a655-2cf21d05c09f" /> It is correctly resolving only these 2 failed tasks <img width="1745" height="777" alt="Image" src="https://github.com/user-attachments/assets/70feb67e-c0b9-4318-8c7f-5fca64c8fbf9" /> So it must be possible somehow but only the manual clearing of an individual task seems to be broken. Example DAG: ``` from datetime import datetime from airflow.exceptions import AirflowException from airflow.sdk import dag, task, task_group @dag( dag_id="test_task_group_clearing", start_date=datetime(2026, 1, 1), schedule=None, catchup=False, ) def test_task_group_clearing(): @task def get_items(): return ["item_A", "item_B", "item_C"] @task_group(group_id="processing_group") def process_item_group(item: str): @task def step_1(val: str): if val == "item_B": raise AirflowException("Mock fail") return val @task def step_2(val: str): return val step_1(val=item) >> step_2(val=item) items = get_items() process_item_group.expand(item=items) test_task_group_clearing() ``` ### What you think should happen instead? _No response_ ### Operating System _No response_ ### Versions of Apache Airflow Providers _No response_ ### Deployment None ### Deployment details _No response_ ### Anything else? _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
