lkindere opened a new issue, #64526:
URL: https://github.com/apache/airflow/issues/64526

   ### Apache Airflow version
   
   3.1.6
   
   ### What happened and how to reproduce it?
   
   When clearing a single failed task instance in a mapped task group with 
downstream option, it will clear all instances of the downstream task. For 
example if we have a mapped task group of 3 tasks:
   
   Task1_A, Task1_B, Task1_C
   >>
   Task2_A, Task2_B, Task2_C
   
   Clearing Task1_B in the UI with downstream selected would clear Task1_B and 
Task2_A, Task2_B, Task2_C.
   Excepted behaviour: Clearing Task1_B and Task1_C
   
   <img width="1745" height="572" alt="Image" 
src="https://github.com/user-attachments/assets/58a8d8a3-c756-4b16-a395-87fd9c2c85b8";
 />
   Trying to clear only in index 0:
   
   <img width="1745" height="572" alt="Image" 
src="https://github.com/user-attachments/assets/b3f752c0-dda5-4262-bff4-cc518b0e8dcc";
 />
   
   We see all 3 instances of the second task get selected:
   
   <img width="1745" height="572" alt="Image" 
src="https://github.com/user-attachments/assets/f6289834-baeb-4eed-b26d-a64b92af5013";
 />
   
   However if we clear the dag run itself for failed tasks for example:
   
   <img width="1745" height="777" alt="Image" 
src="https://github.com/user-attachments/assets/4e1aa7ec-abb4-4ab1-9df9-990e1b0dfd42";
 />
   
   <img width="1745" height="777" alt="Image" 
src="https://github.com/user-attachments/assets/af251876-f55f-4a2f-a655-2cf21d05c09f";
 />
   
   It is correctly resolving only these 2 failed tasks
   <img width="1745" height="777" alt="Image" 
src="https://github.com/user-attachments/assets/70feb67e-c0b9-4318-8c7f-5fca64c8fbf9";
 />
   
   So it must be possible somehow but only the manual clearing of an individual 
task seems to be broken.
   
   
   Example DAG:
   ```
   from datetime import datetime
   
   from airflow.exceptions import AirflowException
   from airflow.sdk import dag, task, task_group
   
   @dag(
       dag_id="test_task_group_clearing",
       start_date=datetime(2026, 1, 1),
       schedule=None,
       catchup=False,
   )
   def test_task_group_clearing():
   
       @task
       def get_items():
           return ["item_A", "item_B", "item_C"]
   
       @task_group(group_id="processing_group")
       def process_item_group(item: str):
           @task
           def step_1(val: str):
               if val == "item_B":
                   raise AirflowException("Mock fail")
               return val
   
           @task
           def step_2(val: str):
               return val
   
           step_1(val=item) >> step_2(val=item)
   
       items = get_items()
       process_item_group.expand(item=items)
   
   test_task_group_clearing()
   ```
   
   ### What you think should happen instead?
   
   _No response_
   
   ### Operating System
   
   _No response_
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   None
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to