Re: [D] Airflow 3.1.5 Mixed Executors - Task Instance Not Found & State Desync Issues [airflow]
GitHub user benbenbang closed the discussion with a comment: Airflow 3.1.5 Mixed Executors - Task Instance Not Found & State Desync Issues > For users who experience the same thing in the future: > > Regardless of which queue you use, if you migrate the same queue from the old > Airflow version to the new one, purge it first to prevent issues. > > Also, Airflow only officially supports `Redis` and `RabbitMQ`. Just keep in > mind that if you use something else. please check if the issue is coming from here GitHub link: https://github.com/apache/airflow/discussions/61468#discussioncomment-15927808 This is an automatically sent email for [email protected]. To unsubscribe, please send an email to: [email protected]
Re: [D] Airflow 3.1.5 Mixed Executors - Task Instance Not Found & State Desync Issues [airflow]
GitHub user benbenbang closed a discussion: Airflow 3.1.5 Mixed Executors -
Task Instance Not Found & State Desync Issues
Hi folks 👋,
We're experiencing critical issues with Airflow `3.1.5` using mixed executors
(`KubernetesExecutor`,
`CeleryExecutor`) that prevent `CeleryExecutor` tasks from running
successfully. This is blocking our
migration from Airflow `2.x` to `3.x`.
## Environment
- Airflow Version: `3.1.5`
- Executor Config: `AIRFLOW__CORE__EXECUTOR=KubernetesExecutor,CeleryExecutor`
- Deployment: `Kubernetes (EKS)`
- Celery Broker: `AWS SQS (FIFO queues)`
- Celery Result Backend: `PostgreSQL`
- Database: `PostgreSQL (RDS)`
- Usages:
- `CeleryExecutor`: `dbt` jobs (high-concurrency, short-duration,
SLA-critical)
- `KubernetesExecutor`: Resource-intensive jobs requiring custom pod specs
- ConfigMap:
```yaml
AIRFLOW__CELERY__BROKER_URL=sqs://
AIRFLOW__CELERY__OPERATION_TIMEOUT=30
AIRFLOW__CELERY__FLOWER_PORT=
AIRFLOW__CELERY_KUBERNETES_EXECUTOR__KUBERNETES_QUEUE=kubernetes
AIRFLOW__CELERY__CELERY_CONFIG_OPTIONS=celery_config.CELERY_CONFIG
AIRFLOW__CELERY__SSL_ACTIVE=false
AIRFLOW__CELERY__FLOWER_HOST=0.0.0.0
AIRFLOW__CELERY__SYNC_PARALLELISM=0
```
- Celery Config:
```json
{
"accept_content": [
"json"
],
"event_serializer": "json",
"worker_prefetch_multiplier": 1,
"task_acks_late": true,
"task_default_queue": "default",
"task_default_exchange": "default",
"task_track_started": true,
"broker_url": "sqs://",
"broker_transport_options": {
"visibility_timeout": 32000,
"region_name": "eu-west-1",
"region": "eu-west-1",
"predefined_queues": {
"vc-sqs-celery-worker.fifo": {
"url":
"https://sqs.eu-west-1.amazonaws.com/xxx/vc-sqs-celery-worker.fifo";
}
}
},
"broker_connection_retry_on_startup": true,
"result_backend": "postgresql",
"database_engine_options": {},
"worker_concurrency": 16,
"worker_enable_remote_control": true,
"worker_redirect_stdouts": false,
"worker_hijack_root_logger": false
}
```
## Configuration
DAG-level executor specification:
```yaml
# config.yaml
dag:
default_args:
executor: CeleryExecutor
pool: pool_dbt_hourly_p0
retries: 2
start_date: "2020-11-30 00:00:00"
```
Celery queue configuration:
```python
# celery_config.py
# merge with from airflow.providers.celery.executors.default_celery import
DEFAULT_CELERY_CONFIG
# becosmes CELERY_CONFIG
EXTRA_CELERY_CONFIG = {
"broker_transport_options": {
"visibility_timeout": 32000,
"region_name": "eu-west-1",
"predefined_queues": {
f"vc-sqs-celery-worker.fifo": {
"url":
f"https://sqs.eu-west-1.amazonaws.com/xxx/vc-sqs-celery-worker.fifo";,
},
},
},
}
```
Queue passed to tasks:
```python
# DAG code
queue = dag_config.DEFAULT_CELERY_QUEUE # "vc-sqs-celery-worker.fifo"
task = DbtBaseOperator(
task_id="run_model",
queue=queue,
# ... other params
)
```
### Evidence of correct configuration (by importing dag and checking in the
pod):
```python
In [1]: dag.task_dict
Out[1]:
{'start': ,
'run_product': ,
'test_product': ,
'snapshot_product': ,
'end': }
In [2]: dag.task_dict["run_product"].queue
Out[2]: 'vc-sqs-celery-worker.fifo'
In [3]: dag.task_dict["run_product"].executor
Out[3]: 'CeleryExecutor'
```
Tasks are correctly configured with both `executor` and `queue` parameters.
## The Issues
### Issue 1: Task Instance Not Found (CRITICAL - Blocks all CeleryExecutor
tasks)
**Tasks with `executor: CeleryExecutor` consistently fail with "Task Instance
not found" errors,
preventing any task execution.**
Celery Worker Logs:
```
2026-02-04T15:17:15.096Z [info] [9e301d75...] Executing workload in Celery:
ti=TaskInstance(id=UUID('019c293a-994b-7abe-839f-4bcb7c1b2e1f'),
task_id='run_product', dag_id='dbt_hourly_p0_product',
run_id='scheduled__2026-02-04T14:30:00+00:00', try_number=4,
queue='vc-sqs-celery-worker.fifo', ...)
2026-02-04T15:17:15.565Z [info] Secrets backends loaded for worker
[15-30 seconds of DAG code loading, Variable API calls]
2026-02-04T15:17:45.678Z [info] Process exited exit_code=-9 signal_sent=SIGKILL
2026-02-04T15:17:45.692Z [error] Task execute_workload[9e301d75...] raised
unexpected:
ServerResponseError('Server returned error')
Traceback:
File "airflow/sdk/execution_time/supervisor.py", line 966, in
_on_child_started
ti_context = self.client.task_instances.start(ti.id, self.pid, start_date)
airflow.sdk.api.client.ServerResponseError: Server returned error
Correlation-id=019c293b-1c2a-7cdd-9b47-294f34d0a7c5
2026-02-04T15:17:48.482Z [error] Server indicated the task shouldn't be running
anymore
[supervisor] detail={'detail': {'reason': 'not_found',
'message': 'Task Instance not found'}} status_code=404
```
Scheduler Logs:
```
2026
Re: [D] Airflow 3.1.5 Mixed Executors - Task Instance Not Found & State Desync Issues [airflow]
GitHub user o-nikolas added a comment to the discussion: Airflow 3.1.5 Mixed Executors - Task Instance Not Found & State Desync Issues The first sentence in that configs description reads: > This section only applies if you are using the `CeleryKubernetesExecutor` in > [core] section above You are not using this executor (which is old and deprecated at this point) GitHub link: https://github.com/apache/airflow/discussions/61468#discussioncomment-15761893 This is an automatically sent email for [email protected]. To unsubscribe, please send an email to: [email protected]
Re: [D] Airflow 3.1.5 Mixed Executors - Task Instance Not Found & State Desync Issues [airflow]
GitHub user benbenbang added a comment to the discussion: Airflow 3.1.5 Mixed Executors - Task Instance Not Found & State Desync Issues For users who experience the same thing in the future: Regardless of which queue you use, if you migrate the same queue from the old Airflow version to the new one, purge it first to prevent issues. Also, Airflow only officially supports `Redis` and `RabbitMQ`. Just keep in mind that if you use something else. GitHub link: https://github.com/apache/airflow/discussions/61468#discussioncomment-15733474 This is an automatically sent email for [email protected]. To unsubscribe, please send an email to: [email protected]
Re: [D] Airflow 3.1.5 Mixed Executors - Task Instance Not Found & State Desync Issues [airflow]
GitHub user benbenbang added a comment to the discussion: Airflow 3.1.5 Mixed Executors - Task Instance Not Found & State Desync Issues Hello Niko, Thanks for the message The config I use is coming from [[here](https://airflow.apache.org/docs/apache-airflow-providers-celery/stable/configurations-ref.html#celery-kubernetes-executor)], which seems to be the latest Celery provider version (3.15.2) Maybe @potiuk can help confirm these docs, please? If it's outdated, I can help to drop it. GitHub link: https://github.com/apache/airflow/discussions/61468#discussioncomment-15733415 This is an automatically sent email for [email protected]. To unsubscribe, please send an email to: [email protected]
Re: [D] Airflow 3.1.5 Mixed Executors - Task Instance Not Found & State Desync Issues [airflow]
GitHub user o-nikolas added a comment to the discussion: Airflow 3.1.5 Mixed Executors - Task Instance Not Found & State Desync Issues I see you're using the config `AIRFLOW__CELERY_KUBERNETES_EXECUTOR__KUBERNETES_QUEUE=kubernetes` This is for the old (deprecated) Celery/k8s hybrid executor, not to be used with multiple executor configuration. I'm wondering if you have a setup that is half converted to the modern multiple executor config. > Is the per-task executor= parameter fully supported in Airflow 3.x mixed > executor setups? The documentation suggests it is (since 2.6+), but we're experiencing critical failures. Multiple executor config was released in 2.10, where in the docs do you see it say this should work with 2.6? I'm wondering if you're mixing configuration between versions. Lastly, I know that Celery can be quite unstable. With tasks being lost in some circumstances, or issues with the task SDK API (task tokens expiring by the time they reach the worker). It's worth looking more into this, which is unrelated to multiple executor configuration. GitHub link: https://github.com/apache/airflow/discussions/61468#discussioncomment-15729791 This is an automatically sent email for [email protected]. To unsubscribe, please send an email to: [email protected]
Re: [D] Airflow 3.1.5 Mixed Executors - Task Instance Not Found & State Desync Issues [airflow]
GitHub user benbenbang edited a comment on the discussion: Airflow 3.1.5 Mixed Executors - Task Instance Not Found & State Desync Issues for taskflow, this works ```python @task(queue="xxx") ``` GitHub link: https://github.com/apache/airflow/discussions/61468#discussioncomment-15706221 This is an automatically sent email for [email protected]. To unsubscribe, please send an email to: [email protected]
Re: [D] Airflow 3.1.5 Mixed Executors - Task Instance Not Found & State Desync Issues [airflow]
GitHub user benbenbang edited a comment on the discussion: Airflow 3.1.5 Mixed Executors - Task Instance Not Found & State Desync Issues for taskflow, this work ```python @task(queue="xxx") ``` GitHub link: https://github.com/apache/airflow/discussions/61468#discussioncomment-15706221 This is an automatically sent email for [email protected]. To unsubscribe, please send an email to: [email protected]
Re: [D] Airflow 3.1.5 Mixed Executors - Task Instance Not Found & State Desync Issues [airflow]
GitHub user benbenbang added a comment to the discussion: Airflow 3.1.5 Mixed Executors - Task Instance Not Found & State Desync Issues for taskflow, this work ```python queue="xxx" ``` GitHub link: https://github.com/apache/airflow/discussions/61468#discussioncomment-15706221 This is an automatically sent email for [email protected]. To unsubscribe, please send an email to: [email protected]
Re: [D] Airflow 3.1.5 Mixed Executors - Task Instance Not Found & State Desync Issues [airflow]
GitHub user benbenbang edited a discussion: Airflow 3.1.5 Mixed Executors -
Task Instance Not Found & State Desync Issues
Hi folks 👋,
We're experiencing critical issues with Airflow `3.1.5` using mixed executors
(`KubernetesExecutor`,
`CeleryExecutor`) that prevent `CeleryExecutor` tasks from running
successfully. This is blocking our
migration from Airflow `2.x` to `3.x`.
## Environment
- Airflow Version: `3.1.5`
- Executor Config: `AIRFLOW__CORE__EXECUTOR=KubernetesExecutor,CeleryExecutor`
- Deployment: `Kubernetes (EKS)`
- Celery Broker: `AWS SQS (FIFO queues)`
- Celery Result Backend: `PostgreSQL`
- Database: `PostgreSQL (RDS)`
- Usages:
- `CeleryExecutor`: `dbt` jobs (high-concurrency, short-duration,
SLA-critical)
- `KubernetesExecutor`: Resource-intensive jobs requiring custom pod specs
- ConfigMap:
```yaml
AIRFLOW__CELERY__BROKER_URL=sqs://
AIRFLOW__CELERY__OPERATION_TIMEOUT=30
AIRFLOW__CELERY__FLOWER_PORT=
AIRFLOW__CELERY_KUBERNETES_EXECUTOR__KUBERNETES_QUEUE=kubernetes
AIRFLOW__CELERY__CELERY_CONFIG_OPTIONS=celery_config.CELERY_CONFIG
AIRFLOW__CELERY__SSL_ACTIVE=false
AIRFLOW__CELERY__FLOWER_HOST=0.0.0.0
AIRFLOW__CELERY__SYNC_PARALLELISM=0
```
- Celery Config:
```json
{
"accept_content": [
"json"
],
"event_serializer": "json",
"worker_prefetch_multiplier": 1,
"task_acks_late": true,
"task_default_queue": "default",
"task_default_exchange": "default",
"task_track_started": true,
"broker_url": "sqs://",
"broker_transport_options": {
"visibility_timeout": 32000,
"region_name": "eu-west-1",
"region": "eu-west-1",
"predefined_queues": {
"vc-sqs-celery-worker.fifo": {
"url":
"https://sqs.eu-west-1.amazonaws.com/xxx/vc-sqs-celery-worker.fifo";
}
}
},
"broker_connection_retry_on_startup": true,
"result_backend": "postgresql",
"database_engine_options": {},
"worker_concurrency": 16,
"worker_enable_remote_control": true,
"worker_redirect_stdouts": false,
"worker_hijack_root_logger": false
}
```
## Configuration
DAG-level executor specification:
```yaml
# config.yaml
dag:
default_args:
executor: CeleryExecutor
pool: pool_dbt_hourly_p0
retries: 2
start_date: "2020-11-30 00:00:00"
```
Celery queue configuration:
```python
# celery_config.py
# merge with from airflow.providers.celery.executors.default_celery import
DEFAULT_CELERY_CONFIG
# becosmes CELERY_CONFIG
EXTRA_CELERY_CONFIG = {
"broker_transport_options": {
"visibility_timeout": 32000,
"region_name": "eu-west-1",
"predefined_queues": {
f"vc-sqs-celery-worker.fifo": {
"url":
f"https://sqs.eu-west-1.amazonaws.com/xxx/vc-sqs-celery-worker.fifo";,
},
},
},
}
```
Queue passed to tasks:
```python
# DAG code
queue = dag_config.DEFAULT_CELERY_QUEUE # "vc-sqs-celery-worker.fifo"
task = DbtBaseOperator(
task_id="run_model",
queue=queue,
# ... other params
)
```
### Evidence of correct configuration (by importing dag and checking in the
pod):
```python
In [1]: dag.task_dict
Out[1]:
{'start': ,
'run_product': ,
'test_product': ,
'snapshot_product': ,
'end': }
In [2]: dag.task_dict["run_product"].queue
Out[2]: 'vc-sqs-celery-worker.fifo'
In [3]: dag.task_dict["run_product"].executor
Out[3]: 'CeleryExecutor'
```
Tasks are correctly configured with both `executor` and `queue` parameters.
## The Issues
### Issue 1: Task Instance Not Found (CRITICAL - Blocks all CeleryExecutor
tasks)
**Tasks with `executor: CeleryExecutor` consistently fail with "Task Instance
not found" errors,
preventing any task execution.**
Celery Worker Logs:
```
2026-02-04T15:17:15.096Z [info] [9e301d75...] Executing workload in Celery:
ti=TaskInstance(id=UUID('019c293a-994b-7abe-839f-4bcb7c1b2e1f'),
task_id='run_product', dag_id='dbt_hourly_p0_product',
run_id='scheduled__2026-02-04T14:30:00+00:00', try_number=4,
queue='vc-sqs-celery-worker.fifo', ...)
2026-02-04T15:17:15.565Z [info] Secrets backends loaded for worker
[15-30 seconds of DAG code loading, Variable API calls]
2026-02-04T15:17:45.678Z [info] Process exited exit_code=-9 signal_sent=SIGKILL
2026-02-04T15:17:45.692Z [error] Task execute_workload[9e301d75...] raised
unexpected:
ServerResponseError('Server returned error')
Traceback:
File "airflow/sdk/execution_time/supervisor.py", line 966, in
_on_child_started
ti_context = self.client.task_instances.start(ti.id, self.pid, start_date)
airflow.sdk.api.client.ServerResponseError: Server returned error
Correlation-id=019c293b-1c2a-7cdd-9b47-294f34d0a7c5
2026-02-04T15:17:48.482Z [error] Server indicated the task shouldn't be running
anymore
[supervisor] detail={'detail': {'reason': 'not_found',
'message': 'Task Instance not found'}} status_code=404
```
Scheduler Logs:
```
2026
[D] Airflow 3.1.5 Mixed Executors - Task Instance Not Found & State Desync Issues [airflow]
GitHub user benbenbang created a discussion: Airflow 3.1.5 Mixed Executors -
Task Instance Not Found & State Desync Issues
Hi folks 👋,
We're experiencing critical issues with Airflow `3.1.5` using mixed executors
(`KubernetesExecutor`,
`CeleryExecutor`) that prevent `CeleryExecutor` tasks from running
successfully. This is blocking our
migration from Airflow `2.x` to `3.x`.
## Environment
- Airflow Version: `3.1.5`
- Executor Config: `AIRFLOW__CORE__EXECUTOR=KubernetesExecutor,CeleryExecutor`
- Deployment: `Kubernetes (EKS)`
- Celery Broker: `AWS SQS (FIFO queues)`
- Celery Result Backend: `PostgreSQL`
- Database: `PostgreSQL (RDS)`
- Usages:
- `CeleryExecutor`: `dbt` jobs (high-concurrency, short-duration,
SLA-critical)
- `KubernetesExecutor`: Resource-intensive jobs requiring custom pod specs
- ConfigMap:
```yaml
AIRFLOW__CELERY__BROKER_URL=sqs://
AIRFLOW__CELERY__OPERATION_TIMEOUT=30
AIRFLOW__CELERY__FLOWER_PORT=
AIRFLOW__CELERY_KUBERNETES_EXECUTOR__KUBERNETES_QUEUE=kubernetes
AIRFLOW__CELERY__CELERY_CONFIG_OPTIONS=celery_config.CELERY_CONFIG
AIRFLOW__CELERY__SSL_ACTIVE=false
AIRFLOW__CELERY__FLOWER_HOST=0.0.0.0
AIRFLOW__CELERY__SYNC_PARALLELISM=0
```
- Celery Config:
```json
{
"accept_content": [
"json"
],
"event_serializer": "json",
"worker_prefetch_multiplier": 1,
"task_acks_late": true,
"task_default_queue": "default",
"task_default_exchange": "default",
"task_track_started": true,
"broker_url": "sqs://",
"broker_transport_options": {
"visibility_timeout": 32000,
"region_name": "eu-west-1",
"region": "eu-west-1",
"predefined_queues": {
"vc-sqs-celery-worker.fifo": {
"url":
"https://sqs.eu-west-1.amazonaws.com/xxx/vc-sqs-celery-worker.fifo";
}
}
},
"broker_connection_retry_on_startup": true,
"result_backend": "postgresql",
"database_engine_options": {},
"worker_concurrency": 16,
"worker_enable_remote_control": true,
"worker_redirect_stdouts": false,
"worker_hijack_root_logger": false
}
```
## Configuration
DAG-level executor specification:
```yaml
# config.yaml
dag:
default_args:
executor: CeleryExecutor
pool: pool_dbt_hourly_p0
retries: 2
start_date: "2020-11-30 00:00:00"
```
Celery queue configuration:
```python
# celery_config.py
# merge with from airflow.providers.celery.executors.default_celery import
DEFAULT_CELERY_CONFIG
# becosmes CELERY_CONFIG
EXTRA_CELERY_CONFIG = {
"broker_transport_options": {
"visibility_timeout": 32000,
"region_name": "eu-west-1",
"predefined_queues": {
f"vc-sqs-celery-worker.fifo": {
"url":
f"https://sqs.eu-west-1.amazonaws.com/xxx/vc-sqs-celery-worker.fifo";,
},
},
},
}
```
Queue passed to tasks:
```python
# DAG code
queue = dag_config.DEFAULT_CELERY_QUEUE # "vc-sqs-celery-worker.fifo"
task = DbtBaseOperator(
task_id="run_model",
queue=queue,
# ... other params
)
```
### Evidence of correct configuration (by importing dag and checking in the
pod):
```python
In [1]: dag.task_dict
Out[1]:
{'start': ,
'run_product': ,
'test_product': ,
'snapshot_product': ,
'end': }
In [2]: dag.task_dict["run_product"].queue
Out[2]: 'vc-sqs-celery-worker.fifo'
In [3]: dag.task_dict["run_product"].executor
Out[3]: 'CeleryExecutor'
```
Tasks are correctly configured with both `executor` and `queue` parameters.
## The Issues
### Issue 1: Task Instance Not Found (CRITICAL - Blocks all CeleryExecutor
tasks)
**Tasks with `executor: CeleryExecutor` consistently fail with "Task Instance
not found" errors,
preventing any task execution.**
Celery Worker Logs:
```
2026-02-04T15:17:15.096Z [info] [9e301d75...] Executing workload in Celery:
ti=TaskInstance(id=UUID('019c293a-994b-7abe-839f-4bcb7c1b2e1f'),
task_id='run_product', dag_id='dbt_hourly_p0_product',
run_id='scheduled__2026-02-04T14:30:00+00:00', try_number=4,
queue='vc-sqs-celery-worker.fifo', ...)
2026-02-04T15:17:15.565Z [info] Secrets backends loaded for worker
[15-30 seconds of DAG code loading, Variable API calls]
2026-02-04T15:17:45.678Z [info] Process exited exit_code=-9 signal_sent=SIGKILL
2026-02-04T15:17:45.692Z [error] Task execute_workload[9e301d75...] raised
unexpected:
ServerResponseError('Server returned error')
Traceback:
File "airflow/sdk/execution_time/supervisor.py", line 966, in
_on_child_started
ti_context = self.client.task_instances.start(ti.id, self.pid, start_date)
airflow.sdk.api.client.ServerResponseError: Server returned error
Correlation-id=019c293b-1c2a-7cdd-9b47-294f34d0a7c5
2026-02-04T15:17:48.482Z [error] Server indicated the task shouldn't be running
anymore
[supervisor] detail={'detail': {'reason': 'not_found',
'message': 'Task Instance not found'}} status_code=404
```
Scheduler Logs:
```
202
