benbuckman opened a new issue, #34023:
URL: https://github.com/apache/airflow/issues/34023

   ### Apache Airflow version
   
   2.7.0
   
   ### What happened
   
   I have the following DAG:
   
   ```python
   from __future__ import annotations
   from datetime import datetime
   
   from airflow.decorators import dag, task, task_group
   from airflow.utils.trigger_rule import TriggerRule
   
   @task
   def get_records() -> list[str]:
       return ["a", "b", "c"]
   
   
   @task
   def submit_job(record: str) -> None:
       pass
   
   @task
   def fake_sensor(record: str) -> bool:
       raise RuntimeError("boo")
   
   
   @task
   def deliver_record(record: str) -> None:
       pass
   
   
   @task(trigger_rule=TriggerRule.ONE_FAILED)
   def handle_failed_delivery(record: str) -> None:
       pass
   
   
   @task_group(group_id="deliver_records")
   def deliver_record_task_group(record: str):
       (
           submit_job(record=record)
           >> fake_sensor(record=record)
           >> deliver_record(record=record)
           >> handle_failed_delivery(record=record)
       )
   
   @dag(
       dag_id="demo_trigger_one_failed",
       schedule=None,
       start_date=datetime(2023, 1, 1),
   )
   def demo_trigger_one_failed() -> None:
       records = get_records()
       deliver_record_task_group.expand(record=records)
   
   
   demo_trigger_one_failed()
   ```
   
   - `fake_sensor` is simulating a task that raises an exception. (It could be 
a `@task.sensor` raising a `AirflowSensorTimeout`; it doesn't matter, the 
behavior is the same.)
   - `handle_failed_delivery`'s `TriggerRule.ONE_FAILED` means **it is supposed 
to run whenever any task upstream fails.** So when `fake_sensor` fails, 
`handle_failed_delivery` should run.
   
   But this does not work. `handle_failed_delivery` is skipped, and (based on 
the UI) it's skipped very early, before it can know if the upstream tasks have 
completed successfully or errored.
   
   Here's what I see, progressively (see `How to reproduce` below for how I got 
this):
   
   |  started ... |  skipped too early ...  |  fake sensor about to fail...   | 
... done, didn't run |
   |--------|--------|--------|--------|
   | <img width="312" alt="Screenshot 2023-09-01 at 3 26 49 PM" 
src="https://github.com/apache/airflow/assets/354655/2a9bb897-dd02-4c03-a381-2deb774d1072";>
 | <img width="310" alt="Screenshot 2023-09-01 at 3 26 50 PM" 
src="https://github.com/apache/airflow/assets/354655/11d0f8c5-c7c0-400f-95dd-4ed3992701d0";>
 | <img width="308" alt="Screenshot 2023-09-01 at 3 26 53 PM" 
src="https://github.com/apache/airflow/assets/354655/dd81e42e-ca24-45fa-a18d-df2b435c3d82";>
 | <img width="309" alt="Screenshot 2023-09-01 at 3 26 56 PM" 
src="https://github.com/apache/airflow/assets/354655/d3a3303c-91d9-498a-88c3-f1aa1e8580b6";>
 |
   
   If I remove the task group and instead do,
   ```python
   @dag(
       dag_id="demo_trigger_one_failed",
       schedule=None,
       start_date=datetime(2023, 1, 1),
   )
   def demo_trigger_one_failed() -> None:
       records = get_records()
       (
           submit_job(record=records)
           >> fake_sensor.expand(record=records)
           >> deliver_record.expand(record=records)
           >> handle_failed_delivery.expand(record=records)
       )
   ```
   
   then it does the right thing:
   
   |  started ... |  waiting ...  |  ... done, triggered correctly |
   |--------|--------|--------|
   | <img width="301" alt="Screenshot 2023-09-01 at 3 46 48 PM" 
src="https://github.com/apache/airflow/assets/354655/7e52979b-0161-4469-b284-3411a0b1b1c4";>
 | <img width="306" alt="Screenshot 2023-09-01 at 3 46 50 PM" 
src="https://github.com/apache/airflow/assets/354655/733654f3-8cb0-4181-b6b7-bad02994469d";>
 | <img width="304" alt="Screenshot 2023-09-01 at 3 46 53 PM" 
src="https://github.com/apache/airflow/assets/354655/13ffb46f-d5ca-4e7a-8d60-caad2e4a7827";>
 |
   
   
   
   
   ### What you think should happen instead
   
   The behavior with the task group should be the same as without the task 
group: the `handle_failed_delivery` task with 
`trigger_rule=TriggerRule.ONE_FAILED` should be run when the upstream 
`fake_sensor` task fails.
   
   ### How to reproduce
   
   1. Put the above DAG at a local path, `/tmp/dags/demo_trigger_one_failed.py`.
   
   2. `docker run -it --rm --mount 
type=bind,source="/tmp/dags",target=/opt/airflow/dags -p 8080:8080 
apache/airflow:2.7.0-python3.10 bash`
   
   3. In the container:
       ```
       airflow db init
       airflow users create --role Admin --username airflow --email airflow 
--firstname airflow --lastname airflow --password airflow
       airflow scheduler --daemon
       airflow webserver
       ```
   
   4. Open `http://localhost:8080` on the host. Login with `airflow` / 
`airflow`. Run the DAG.
   
   I tested this with:
   - `apache/airflow:2.6.2-python3.10`
   - `apache/airflow:2.6.3-python3.10`
   - `apache/airflow:2.7.0-python3.10`
   
   ### Operating System
   
   Debian GNU/Linux 11 (bullseye)
   
   ### Versions of Apache Airflow Providers
   
   n/a
   
   ### Deployment
   
   Other Docker-based deployment
   
   ### Deployment details
   
   This can be reproduced using standalone Docker images, see Repro steps above.
   
   ### Anything else
   
   I wonder if this is related to (or fixed by?) 
https://github.com/apache/airflow/issues/33446 -> 
https://github.com/apache/airflow/pull/33732 ? (The latter was "added to the 
`Airflow 2.7.1` milestone 3 days ago." I can try to install that pre-release 
code in the container and see if it's fixed.)
   _edit_: nope, [not 
fixed](https://github.com/apache/airflow/issues/34023#issuecomment-1703298280)
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to