gavinhonl opened a new issue, #39801:
URL: https://github.com/apache/airflow/issues/39801

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### If "Other Airflow 2 version" selected, which one?
   
   2.8.3 and 2.9.1
   
   ### What happened?
   
   If a DAG is run for the first time, a task with a 
'none_failed_min_one_success' trigger rule is almost immediately skipped before 
the dependant upstream tasks are complete resulting in:
   
![image](https://github.com/apache/airflow/assets/39799836/31e57fae-f516-49e3-bebe-900660facb3f)
   
   If I clear the same DAG run, with the process_items task group fully 
expanded, the DAG completes as expected:
   
![image](https://github.com/apache/airflow/assets/39799836/04ad50f0-10ee-4f33-b5ee-72be3b6e8210)
   
   
   
   ### What you think should happen instead?
   
   If the a_end task has a 'none_failed_min_one_success' trigger rule it should 
only be run when its upstream dependant tasks are complete and not in a failed 
or upstream_failed state and at least one upstream task has succeeded.
   
   ### How to reproduce
   
   DAG Code:
   
   `from datetime import datetime, timedelta
   
   from airflow.decorators import task, task_group
   from airflow import DAG, AirflowException
   from airflow.operators.empty import EmptyOperator
   
   default_args = {
       'owner': 'insight_techops',
       'depends_on_past': False,
       'start_date': datetime(2019, 1, 1),
       'provide_context': True,
       'retries': 0,
       'retry_delay': timedelta(seconds=30)
   }
   
   @task
   def parse_csv_schedule():
       items_dict = {'A': '1', 'B': '2', 'C': '3', 'D': '4'}
       return items_dict
   
   @task_group(group_id="process_items")
   def process_items(items_dict: dict):
       @task
       def retrieve_item_metadata(items: dict):
           media_asset = items[0]
           print(media_asset)
           raise AirflowException("Failing")
   
       @task_group(group_id="a_process")
       def a_process():
           @task.branch(retries=0)
           def a_start():
               a_handling = 'none'
               if a_handling == 'bypass':
                   return "process_items.a_process.a_bypass"
               else:
                   return "process_items.a_process.a_end"
               
           a_start = a_start()
           a_bypass = EmptyOperator(task_id='a_bypass')
           a_end = EmptyOperator(task_id='a_end', 
trigger_rule='none_failed_min_one_success')
           a_start >> a_bypass >> a_end
           a_start >> a_end
   
       @task
       def mark_item_as_done():
           try:
               print(f"Marking item as Done")
           except Exception as error:
               raise AirflowException(error)
       
       item_dict = retrieve_item_metadata(items=items_dict)
       mark_item_as_done = mark_item_as_done()
       a_process = a_process()
       item_dict >> a_process >> mark_item_as_done
   
   with DAG(dag_id='af055_Debugger', default_args=default_args, 
max_active_runs=1, schedule_interval=None, tags=['sales']):
       end = EmptyOperator(task_id='end')
       items_dict = parse_csv_schedule()
       items_dict >> process_items.expand(items_dict=items_dict) >> end
   `
   
   I'm pretty sure it is related to the 'none_failed_min_one_success' as if I 
cause an upstream task within the TaskGroup to fail, the task with the trigger 
rule in question (and subsequent downstream tasks) are skipped:
   
![image](https://github.com/apache/airflow/assets/39799836/c39d945a-e543-4be8-b6c4-b9e59bb0541e)
   
   Compared to if the trigger rule for the a_end is default:
   
![image](https://github.com/apache/airflow/assets/39799836/065b2fb8-ae4d-4f53-bb8a-07bef94207eb)
   
   ### Operating System
   
   Mac OS 13.6 and Ubuntu 22.0.4
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   Dev:
   Docker Desktop, Helm, Kubernetes
   
   Prod:
   EKS, Helm
   
   ### Anything else?
   
   100% reproducible
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to