benbuckman opened a new issue, #34023: URL: https://github.com/apache/airflow/issues/34023
### Apache Airflow version 2.7.0 ### What happened I have the following DAG: ```python from __future__ import annotations from datetime import datetime from airflow.decorators import dag, task, task_group from airflow.utils.trigger_rule import TriggerRule @task def get_records() -> list[str]: return ["a", "b", "c"] @task def submit_job(record: str) -> None: pass @task def fake_sensor(record: str) -> bool: raise RuntimeError("boo") @task def deliver_record(record: str) -> None: pass @task(trigger_rule=TriggerRule.ONE_FAILED) def handle_failed_delivery(record: str) -> None: pass @task_group(group_id="deliver_records") def deliver_record_task_group(record: str): ( submit_job(record=record) >> fake_sensor(record=record) >> deliver_record(record=record) >> handle_failed_delivery(record=record) ) @dag( dag_id="demo_trigger_one_failed", schedule=None, start_date=datetime(2023, 1, 1), ) def demo_trigger_one_failed() -> None: records = get_records() deliver_record_task_group.expand(record=records) demo_trigger_one_failed() ``` - `fake_sensor` is simulating a task that raises an exception. (It could be a `@task.sensor` raising a `AirflowSensorTimeout`; it doesn't matter, the behavior is the same.) - `handle_failed_delivery`'s `TriggerRule.ONE_FAILED` means **it is supposed to run whenever any task upstream fails.** So when `fake_sensor` fails, `handle_failed_delivery` should run. But this does not work. `handle_failed_delivery` is skipped, and (based on the UI) it's skipped very early, before it can know if the upstream tasks have completed successfully or errored. Here's what I see, progressively (see `How to reproduce` below for how I got this): | started ... | skipped too early ... | fake sensor about to fail... | ... done, didn't run | |--------|--------|--------|--------| | <img width="312" alt="Screenshot 2023-09-01 at 3 26 49 PM" src="https://github.com/apache/airflow/assets/354655/2a9bb897-dd02-4c03-a381-2deb774d1072"> | <img width="310" alt="Screenshot 2023-09-01 at 3 26 50 PM" src="https://github.com/apache/airflow/assets/354655/11d0f8c5-c7c0-400f-95dd-4ed3992701d0"> | <img width="308" alt="Screenshot 2023-09-01 at 3 26 53 PM" src="https://github.com/apache/airflow/assets/354655/dd81e42e-ca24-45fa-a18d-df2b435c3d82"> | <img width="309" alt="Screenshot 2023-09-01 at 3 26 56 PM" src="https://github.com/apache/airflow/assets/354655/d3a3303c-91d9-498a-88c3-f1aa1e8580b6"> | If I remove the task group and instead do, ```python @dag( dag_id="demo_trigger_one_failed", schedule=None, start_date=datetime(2023, 1, 1), ) def demo_trigger_one_failed() -> None: records = get_records() ( submit_job(record=records) >> fake_sensor.expand(record=records) >> deliver_record.expand(record=records) >> handle_failed_delivery.expand(record=records) ) ``` then it does the right thing: | started ... | waiting ... | ... done, triggered correctly | |--------|--------|--------| | <img width="301" alt="Screenshot 2023-09-01 at 3 46 48 PM" src="https://github.com/apache/airflow/assets/354655/7e52979b-0161-4469-b284-3411a0b1b1c4"> | <img width="306" alt="Screenshot 2023-09-01 at 3 46 50 PM" src="https://github.com/apache/airflow/assets/354655/733654f3-8cb0-4181-b6b7-bad02994469d"> | <img width="304" alt="Screenshot 2023-09-01 at 3 46 53 PM" src="https://github.com/apache/airflow/assets/354655/13ffb46f-d5ca-4e7a-8d60-caad2e4a7827"> | ### What you think should happen instead The behavior with the task group should be the same as without the task group: the `handle_failed_delivery` task with `trigger_rule=TriggerRule.ONE_FAILED` should be run when the upstream `fake_sensor` task fails. ### How to reproduce 1. Put the above DAG at a local path, `/tmp/dags/demo_trigger_one_failed.py`. 2. `docker run -it --rm --mount type=bind,source="/tmp/dags",target=/opt/airflow/dags -p 8080:8080 apache/airflow:2.7.0-python3.10 bash` 3. In the container: ``` airflow db init airflow users create --role Admin --username airflow --email airflow --firstname airflow --lastname airflow --password airflow airflow scheduler --daemon airflow webserver ``` 4. Open `http://localhost:8080` on the host. Login with `airflow` / `airflow`. Run the DAG. I tested this with: - `apache/airflow:2.6.2-python3.10` - `apache/airflow:2.6.3-python3.10` - `apache/airflow:2.7.0-python3.10` ### Operating System Debian GNU/Linux 11 (bullseye) ### Versions of Apache Airflow Providers n/a ### Deployment Other Docker-based deployment ### Deployment details This can be reproduced using standalone Docker images, see Repro steps above. ### Anything else I wonder if this is related to (or fixed by?) https://github.com/apache/airflow/issues/33446 -> https://github.com/apache/airflow/pull/33732 ? (The latter was "added to the `Airflow 2.7.1` milestone 3 days ago." I can try to install that pre-release code in the container and see if it's fixed.) _edit_: nope, [not fixed](https://github.com/apache/airflow/issues/34023#issuecomment-1703298280) ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org