ciancolo opened a new issue, #54939:
URL: https://github.com/apache/airflow/issues/54939
### Apache Airflow version
3.0.5
### If "Other Airflow 2 version" selected, which one?
_No response_
### What happened?
We are running Airflow 3.0.4 (latest version available on PyPI) with
LocalExecutor and a PostgreSQL 16 database hosted on another machine in the
same network as the metastore.
Since upgrading from Airflow 2.11 to Airflow 3, we’ve been encountering an
issue where sensors are killed before their configured timeout. This only
happens during periods of high load, when multiple DAGs and sensors are running
in parallel.
The sensors are configured as follows:
```
ExternalTaskSensor(task_id='my_sensor_Task',
external_dag_id='external_dag',
external_task_id='external_task',
allowed_states=['success'],
execution_delta=timedelta(seconds=0),
poke_interval=60,
mode='reschedule',
timeout=60 * 60 * 3,
dag=dag)
```
Here is our Airflow configuration:
```
[core]
parallelism = 32
max_active_tasks_per_dag = 16
max_active_runs_per_dag = 16
[database]
sql_alchemy_pool_size = 20
sql_alchemy_max_overflow = 40
sql_alchemy_pool_recycle = 1800
```
Yesterday we increased the pool size and the max overflow parameters for the
database. The issue has improved, but it is still not fully resolved.
We also checked the database to see if it was overloaded or unable to handle
all the requests, but it looks fine.
Looking at the logs, we find the following errors in the scheduler log:
```
{base_executor.py:512} DEBUG - Changing state:
TaskInstanceKey(dag_id='sensor_race_test', task_id='test_sensor_38',
run_id='scheduled__2024-01-10T00:00:00+00:00', try_number=2, map_index=-1)
{base_executor.py:517} DEBUG - Could not find key:
TaskInstanceKey(dag_id='sensor_race_test', task_id='test_sensor_38',
run_id='scheduled__2024-01-10T00:00:00+00:00', try_number=2, map_index=-1)
{scheduler_job_runner.py:811} INFO - Received executor event with state
success for task instance TaskInstanceKey(dag_id='sensor_race_test',
task_id='test_sensor_38', run_id='scheduled__2024-01-10T00:00:00+00:00',
try_number=2, map_index=-1)
{scheduler_job_runner.py:853} INFO - TaskInstance Finished:
dag_id=sensor_race_test, task_id=test_sensor_38,
run_id=scheduled__2024-01-10T00:00:00+00:00, map_index=-1,
run_start_date=2025-08-26 09:33:41.915710+00:00, run_end_date=2025-08-26
09:33:48.797037+00:00, run_duration=6.881327, state=scheduled,
executor=LocalExecutor(parallelism=32), executor_state=success, try_number=2,
max_tries=1, pool=default_pool, queue=default, priority_weight=1,
operator=PythonSensor, queued_dttm=2025-08-26 09:33:41.700085+00:00,
scheduled_dttm=2025-08-26 09:33:54.249313+00:00,queued_by_job_id=10439623,
pid=1969715
{scheduler_job_runner.py:926} ERROR - DAG 'sensor_race_test' for task
instance <TaskInstance: sensor_race_test.test_sensor_38
scheduled__2024-01-10T00:00:00+00:00 [scheduled]> not found in serialized_dag
table
{taskinstance.py:1882} ERROR - Executor LocalExecutor(parallelism=32)
reported that the task instance <TaskInstance: sensor_race_test.test_sensor_38
scheduled__2024-01-10T00:00:00+00:00 [scheduled]> finished with state success,
but the task instance's state attribute is scheduled. Learn more:
https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally
{listener.py:37} DEBUG - Calling 'on_task_instance_failed' with
{'previous_state': <TaskInstanceState.RUNNING: 'running'>, 'task_instance':
<TaskInstance: sensor_race_test.test_sensor_38
scheduled__2024-01-10T00:00:00+00:00 [failed]>, 'error': "Executor
LocalExecutor(parallelism=32) reported that the task instance <TaskInstance:
sensor_race_test.test_sensor_38 scheduled__2024-01-10T00:00:00+00:00
[scheduled]> finished with state success, but the task instance's state
attribute is scheduled. Learn more:
https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally"}
{taskinstance.py:2005} INFO - Marking task as FAILED.
dag_id=sensor_race_test, task_id=test_sensor_38,
run_id=scheduled__2024-01-10T00:00:00+00:00, logical_date=20240110T000000,
start_date=20250826T093341, end_date=20250826T093354
```
No errors appear in the task logs or any other logs.
### What you think should happen instead?
_No response_
### How to reproduce
We can reproduce the issue using this minimal DAG that creates 50 Python
sensors:
```
from airflow import DAG
from airflow.sensors.base import PokeReturnValue
from airflow.sensors.python import PythonSensor
from datetime import timedelta, datetime
import random
import time
# Funzione fittizia che "simula" una condizione esterna
# Restituisce False quasi sempre, così il sensore rimane in reschedule
# e genera tante richieste allo scheduler
def fake_condition():
# Simula una probabilità bassa di "successo"
return random.random() < 0.01
def sensor_fn():
return PokeReturnValue(is_done=fake_condition())
def make_sensor(task_id):
return PythonSensor(
task_id=task_id,
python_callable=sensor_fn,
poke_interval=5, # molto breve per stressare
mode="reschedule", # fondamentale per generare load su scheduler
timeout=600, # dopo 10 minuti si interrompe
)
def make_dag(dag_id):
with DAG(
dag_id=dag_id,
start_date=datetime(2024, 1, 10),
schedule="@once",
catchup=False,
default_args={"owner": "airflow", "retries": 0},
tags=["test", "sensor"],
) as dag:
# Genera molti sensori paralleli
for i in range(50): # puoi aumentare a 50-100 per stressare di più
make_sensor(f"test_sensor_{i}")
return dag
globals()["sensor_race_test"] = make_dag("sensor_race_test")
```
### Operating System
Ubuntu 22.04.5 LTS
### Versions of Apache Airflow Providers
_No response_
### Deployment
Virtualenv installation
### Deployment details
_No response_
### Anything else?
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]