gavinhonl opened a new issue, #39801: URL: https://github.com/apache/airflow/issues/39801
### Apache Airflow version Other Airflow 2 version (please specify below) ### If "Other Airflow 2 version" selected, which one? 2.8.3 and 2.9.1 ### What happened? If a DAG is run for the first time, a task with a 'none_failed_min_one_success' trigger rule is almost immediately skipped before the dependant upstream tasks are complete resulting in:  If I clear the same DAG run, with the process_items task group fully expanded, the DAG completes as expected:  ### What you think should happen instead? If the a_end task has a 'none_failed_min_one_success' trigger rule it should only be run when its upstream dependant tasks are complete and not in a failed or upstream_failed state and at least one upstream task has succeeded. ### How to reproduce DAG Code: `from datetime import datetime, timedelta from airflow.decorators import task, task_group from airflow import DAG, AirflowException from airflow.operators.empty import EmptyOperator default_args = { 'owner': 'insight_techops', 'depends_on_past': False, 'start_date': datetime(2019, 1, 1), 'provide_context': True, 'retries': 0, 'retry_delay': timedelta(seconds=30) } @task def parse_csv_schedule(): items_dict = {'A': '1', 'B': '2', 'C': '3', 'D': '4'} return items_dict @task_group(group_id="process_items") def process_items(items_dict: dict): @task def retrieve_item_metadata(items: dict): media_asset = items[0] print(media_asset) raise AirflowException("Failing") @task_group(group_id="a_process") def a_process(): @task.branch(retries=0) def a_start(): a_handling = 'none' if a_handling == 'bypass': return "process_items.a_process.a_bypass" else: return "process_items.a_process.a_end" a_start = a_start() a_bypass = EmptyOperator(task_id='a_bypass') a_end = EmptyOperator(task_id='a_end', trigger_rule='none_failed_min_one_success') a_start >> a_bypass >> a_end a_start >> a_end @task def mark_item_as_done(): try: print(f"Marking item as Done") except Exception as error: raise AirflowException(error) item_dict = retrieve_item_metadata(items=items_dict) mark_item_as_done = mark_item_as_done() a_process = a_process() item_dict >> a_process >> mark_item_as_done with DAG(dag_id='af055_Debugger', default_args=default_args, max_active_runs=1, schedule_interval=None, tags=['sales']): end = EmptyOperator(task_id='end') items_dict = parse_csv_schedule() items_dict >> process_items.expand(items_dict=items_dict) >> end ` I'm pretty sure it is related to the 'none_failed_min_one_success' as if I cause an upstream task within the TaskGroup to fail, the task with the trigger rule in question (and subsequent downstream tasks) are skipped:  Compared to if the trigger rule for the a_end is default:  ### Operating System Mac OS 13.6 and Ubuntu 22.0.4 ### Versions of Apache Airflow Providers _No response_ ### Deployment Official Apache Airflow Helm Chart ### Deployment details Dev: Docker Desktop, Helm, Kubernetes Prod: EKS, Helm ### Anything else? 100% reproducible ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org