nathadfield opened a new issue, #66877:
URL: https://github.com/apache/airflow/issues/66877

   ### Under which category would you file this issue?
   
   Providers
   
   ### Apache Airflow version
   
   Observed on Airflow 3.1.8 and reproduced against current `main`.
   
   ### What happened and how to reproduce it?
   
   **Issue Description**
   
   `ExternalTaskSensor` can report success too early on Airflow 3 when it is 
configured with `external_task_group_id`.
   
   The Airflow 3 task-group path asks the Execution API for a map of task 
states, then counts a Dag run as matching when every observed truthy state is 
in `allowed_states`. This means a task-group state map containing only `None` 
values, or a mix of successful tasks and `None` values, is counted as a 
successful group.
   
   These maps are currently counted as one successful Dag run for 
`allowed_states=["success"]`:
   
   ```python
   {"run_id": {"group.task_1": None, "group.task_2": None}}
   {"run_id": {"group.task_1": "success", "group.task_2": None}}
   {"run_id": {"group.task_1": "success"}}
   {"run_id": {}}
   ```
   
   That result is used by both Airflow 3 sensor modes:
   
   - `deferrable=False`: `ExternalTaskSensor._poke_af3(...)`
   - `deferrable=True`: `WorkflowTrigger._get_count_af_3(...)`
   
   The relevant helper is 
`providers/standard/src/airflow/providers/standard/utils/sensor_helper.py`:
   
   ```python
   def _get_count_by_matched_states(run_id_task_state_map: dict[str, dict[str, 
Any]], states: Collection[str]):
       count = 0
       for _, task_states in run_id_task_state_map.items():
           if all(state in states for state in task_states.values() if state):
               count += 1
       return count
   ```
   
   The `if state` filter removes `None` before the `all(...)` check. As a 
result, all-NULL maps become `all([])`, and mixed `success`/NULL maps are 
checked as if only the successful tasks existed.
   
   The Execution API can return NULL task states for task-group queries. A 
freshly created Dag run has task instance rows with `state = NULL`, and the 
`/execution/task-instances/states` endpoint returns those values in its 
`task_states` response.
   
   The issue is specific to the `external_task_group_id` path. Watching 
explicit task IDs with `external_task_ids` uses a different count path.
   
   **Steps to reproduce**
   
   This can be reproduced without running a scheduler or defining a Dag. From 
an Airflow source checkout, run:
   
   ```bash
   uv run --project providers/standard python -c 'from 
airflow.providers.standard.utils.sensor_helper import 
_get_count_by_matched_states as f; cases = {"all_none": {"r": {"t1": None, 
"t2": None}}, "success_plus_none": {"r": {"t1": "success", "t2": None}}, 
"partial_success": {"r": {"t1": "success"}}, "empty_inner": {"r": {}}, 
"mixed_running": {"r": {"t1": "success", "t2": "running"}}}; print({name: 
f(case, ["success"]) for name, case in cases.items()})'
   ```
   
   Current output:
   
   ```python
   {"all_none": 1, "success_plus_none": 1, "partial_success": 1, "empty_inner": 
1, "mixed_running": 0}
   ```
   
   The first four cases are not fully successful task groups, but the helper 
counts each of them as a successful Dag run.
   
   To reproduce through the sensor path:
   
   1. Configure `ExternalTaskSensor` with `external_task_group_id`.
   2. Let the matched upstream Dag run exist while the task instances in the 
group still have `state = NULL`, or while only a subset of the group's task 
states is visible as `success`.
   3. The sensor receives the state map from the Execution API.
   4. `_get_count_by_matched_states(...)` returns `1`.
   5. With one matched logical date, the sensor treats the external task group 
as successful and completes.
   
   ### What you think should happen instead?
   
   A task group should not be treated as successful until the relevant task 
instances for the matched Dag run are all in `allowed_states`.
   
   Task instances with `state = NULL` have not succeeded, so a state map 
containing only NULL states, or a mix of `success` and NULL states, should not 
satisfy the success condition.
   
   ### Operating System
   
   Not OS-specific. The helper-level reproduction was run on Darwin 25.4.0.
   
   ### Deployment
   
   None
   
   ### Apache Airflow Provider(s)
   
   standard
   
   ### Versions of Apache Airflow Providers
   
   Observed with `apache-airflow-providers-standard==1.6.0` and reproduced 
against current `main`.
   
   ### Official Helm Chart version
   
   Not Applicable
   
   ### Kubernetes Version
   
   _No response_
   
   ### Helm Chart configuration
   
   _No response_
   
   ### Docker Image customizations
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to