Re: [D] Airflow 3.1.5 Mixed Executors - Task Instance Not Found & State Desync Issues [airflow]

2026-02-25 Thread via GitHub


GitHub user benbenbang closed the discussion with a comment: Airflow 3.1.5 
Mixed Executors - Task Instance Not Found & State Desync Issues

> For users who experience the same thing in the future:
> 
> Regardless of which queue you use, if you migrate the same queue from the old 
> Airflow version to the new one, purge it first to prevent issues.
> 
> Also, Airflow only officially supports `Redis` and `RabbitMQ`. Just keep in 
> mind that if you use something else.

please check if the issue is coming from here

GitHub link: 
https://github.com/apache/airflow/discussions/61468#discussioncomment-15927808


This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]



Re: [D] Airflow 3.1.5 Mixed Executors - Task Instance Not Found & State Desync Issues [airflow]

2026-02-25 Thread via GitHub


GitHub user benbenbang closed a discussion: Airflow 3.1.5 Mixed Executors - 
Task Instance Not Found & State Desync Issues

Hi folks 👋,

We're experiencing critical issues with Airflow `3.1.5` using mixed executors 
(`KubernetesExecutor`,
`CeleryExecutor`) that prevent `CeleryExecutor` tasks from running 
successfully. This is blocking our
migration from Airflow `2.x` to `3.x`.

## Environment

- Airflow Version: `3.1.5`
- Executor Config: `AIRFLOW__CORE__EXECUTOR=KubernetesExecutor,CeleryExecutor`
- Deployment: `Kubernetes (EKS)`
- Celery Broker: `AWS SQS (FIFO queues)`
- Celery Result Backend: `PostgreSQL`
- Database: `PostgreSQL (RDS)`
- Usages:
- `CeleryExecutor`: `dbt` jobs (high-concurrency, short-duration, 
SLA-critical)
- `KubernetesExecutor`: Resource-intensive jobs requiring custom pod specs
- ConfigMap:
   ```yaml
AIRFLOW__CELERY__BROKER_URL=sqs://
AIRFLOW__CELERY__OPERATION_TIMEOUT=30
AIRFLOW__CELERY__FLOWER_PORT=
AIRFLOW__CELERY_KUBERNETES_EXECUTOR__KUBERNETES_QUEUE=kubernetes
AIRFLOW__CELERY__CELERY_CONFIG_OPTIONS=celery_config.CELERY_CONFIG
AIRFLOW__CELERY__SSL_ACTIVE=false
AIRFLOW__CELERY__FLOWER_HOST=0.0.0.0
AIRFLOW__CELERY__SYNC_PARALLELISM=0
   ```
- Celery Config:
  ```json
  {
"accept_content": [
"json"
],
"event_serializer": "json",
"worker_prefetch_multiplier": 1,
"task_acks_late": true,
"task_default_queue": "default",
"task_default_exchange": "default",
"task_track_started": true,
"broker_url": "sqs://",
"broker_transport_options": {
"visibility_timeout": 32000,
"region_name": "eu-west-1",
"region": "eu-west-1",
"predefined_queues": {
"vc-sqs-celery-worker.fifo": {
"url": 
"https://sqs.eu-west-1.amazonaws.com/xxx/vc-sqs-celery-worker.fifo";
}
}
},
"broker_connection_retry_on_startup": true,
"result_backend": "postgresql",
"database_engine_options": {},
"worker_concurrency": 16,
"worker_enable_remote_control": true,
"worker_redirect_stdouts": false,
"worker_hijack_root_logger": false
  }
  ```


## Configuration

DAG-level executor specification:
```yaml
# config.yaml
dag:
  default_args:
executor: CeleryExecutor
pool: pool_dbt_hourly_p0
retries: 2
start_date: "2020-11-30 00:00:00"
```

Celery queue configuration:
```python
# celery_config.py
# merge with from airflow.providers.celery.executors.default_celery import 
DEFAULT_CELERY_CONFIG
# becosmes CELERY_CONFIG
 
EXTRA_CELERY_CONFIG = {
"broker_transport_options": {
"visibility_timeout": 32000,
"region_name": "eu-west-1",
"predefined_queues": {
f"vc-sqs-celery-worker.fifo": {
"url":
f"https://sqs.eu-west-1.amazonaws.com/xxx/vc-sqs-celery-worker.fifo";,
},
},
},
}
```

Queue passed to tasks:
```python
# DAG code
queue = dag_config.DEFAULT_CELERY_QUEUE  # "vc-sqs-celery-worker.fifo"
task = DbtBaseOperator(
task_id="run_model",
queue=queue,
# ... other params
)
```

### Evidence of correct configuration (by importing dag and checking in the 
pod):
```python
In [1]: dag.task_dict
Out[1]:
{'start': ,
 'run_product': ,
 'test_product': ,
 'snapshot_product': ,
 'end': }

In [2]: dag.task_dict["run_product"].queue
Out[2]: 'vc-sqs-celery-worker.fifo'

In [3]: dag.task_dict["run_product"].executor
Out[3]: 'CeleryExecutor'
```

Tasks are correctly configured with both `executor` and `queue` parameters.

## The Issues

### Issue 1: Task Instance Not Found (CRITICAL - Blocks all CeleryExecutor 
tasks)

**Tasks with `executor: CeleryExecutor` consistently fail with "Task Instance 
not found" errors,
preventing any task execution.**

Celery Worker Logs:
```
2026-02-04T15:17:15.096Z [info] [9e301d75...] Executing workload in Celery:
  ti=TaskInstance(id=UUID('019c293a-994b-7abe-839f-4bcb7c1b2e1f'),
  task_id='run_product', dag_id='dbt_hourly_p0_product',
  run_id='scheduled__2026-02-04T14:30:00+00:00', try_number=4,
  queue='vc-sqs-celery-worker.fifo', ...)

2026-02-04T15:17:15.565Z [info] Secrets backends loaded for worker

[15-30 seconds of DAG code loading, Variable API calls]

2026-02-04T15:17:45.678Z [info] Process exited exit_code=-9 signal_sent=SIGKILL

2026-02-04T15:17:45.692Z [error] Task execute_workload[9e301d75...] raised 
unexpected:
  ServerResponseError('Server returned error')

  Traceback:
File "airflow/sdk/execution_time/supervisor.py", line 966, in 
_on_child_started
  ti_context = self.client.task_instances.start(ti.id, self.pid, start_date)
  airflow.sdk.api.client.ServerResponseError: Server returned error
  Correlation-id=019c293b-1c2a-7cdd-9b47-294f34d0a7c5

2026-02-04T15:17:48.482Z [error] Server indicated the task shouldn't be running 
anymore
  [supervisor] detail={'detail': {'reason': 'not_found',
  'message': 'Task Instance not found'}} status_code=404
```

Scheduler Logs:
```
2026

Re: [D] Airflow 3.1.5 Mixed Executors - Task Instance Not Found & State Desync Issues [airflow]

2026-02-10 Thread via GitHub


GitHub user o-nikolas added a comment to the discussion: Airflow 3.1.5 Mixed 
Executors - Task Instance Not Found & State Desync Issues

The first sentence in that configs description reads:

> This section only applies if you are using the `CeleryKubernetesExecutor` in 
> [core] section above

You are not using this executor (which is old and deprecated at this point)

GitHub link: 
https://github.com/apache/airflow/discussions/61468#discussioncomment-15761893


This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]



Re: [D] Airflow 3.1.5 Mixed Executors - Task Instance Not Found & State Desync Issues [airflow]

2026-02-08 Thread via GitHub


GitHub user benbenbang added a comment to the discussion: Airflow 3.1.5 Mixed 
Executors - Task Instance Not Found & State Desync Issues

For users who experience the same thing in the future:

Regardless of which queue you use, if you migrate the same queue from the old 
Airflow version to the new one, purge it first to prevent issues.

Also, Airflow only officially supports `Redis` and `RabbitMQ`.  Just keep in 
mind that if you use something else.

GitHub link: 
https://github.com/apache/airflow/discussions/61468#discussioncomment-15733474


This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]



Re: [D] Airflow 3.1.5 Mixed Executors - Task Instance Not Found & State Desync Issues [airflow]

2026-02-08 Thread via GitHub


GitHub user benbenbang added a comment to the discussion: Airflow 3.1.5 Mixed 
Executors - Task Instance Not Found & State Desync Issues

Hello Niko, 

Thanks for the message

The config I use is coming from 
[[here](https://airflow.apache.org/docs/apache-airflow-providers-celery/stable/configurations-ref.html#celery-kubernetes-executor)],
 which seems to be the latest Celery provider version (3.15.2)

Maybe @potiuk can help confirm these docs, please? If it's outdated, I can help 
to drop it.

GitHub link: 
https://github.com/apache/airflow/discussions/61468#discussioncomment-15733415


This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]



Re: [D] Airflow 3.1.5 Mixed Executors - Task Instance Not Found & State Desync Issues [airflow]

2026-02-07 Thread via GitHub


GitHub user o-nikolas added a comment to the discussion: Airflow 3.1.5 Mixed 
Executors - Task Instance Not Found & State Desync Issues

I see you're using the config  
`AIRFLOW__CELERY_KUBERNETES_EXECUTOR__KUBERNETES_QUEUE=kubernetes`

This is for the old (deprecated) Celery/k8s hybrid executor, not to be used 
with multiple executor configuration. I'm wondering if you have a setup that is 
half converted to the modern multiple executor config.

> Is the per-task executor= parameter fully supported in Airflow 3.x mixed 
> executor setups? The
documentation suggests it is (since 2.6+), but we're experiencing critical 
failures.

Multiple executor config was released in 2.10, where in the docs do you see it 
say this should work with 2.6? I'm wondering if you're mixing configuration 
between versions.

Lastly, I know that Celery can be quite unstable. With tasks being lost in some 
circumstances, or issues with the task SDK API (task tokens expiring by the 
time they reach the worker). It's worth looking more into this, which is 
unrelated to multiple executor configuration.


GitHub link: 
https://github.com/apache/airflow/discussions/61468#discussioncomment-15729791


This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]



Re: [D] Airflow 3.1.5 Mixed Executors - Task Instance Not Found & State Desync Issues [airflow]

2026-02-05 Thread via GitHub


GitHub user benbenbang edited a comment on the discussion: Airflow 3.1.5 Mixed 
Executors - Task Instance Not Found & State Desync Issues

for taskflow, this works
```python
@task(queue="xxx")
```

GitHub link: 
https://github.com/apache/airflow/discussions/61468#discussioncomment-15706221


This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]



Re: [D] Airflow 3.1.5 Mixed Executors - Task Instance Not Found & State Desync Issues [airflow]

2026-02-05 Thread via GitHub


GitHub user benbenbang edited a comment on the discussion: Airflow 3.1.5 Mixed 
Executors - Task Instance Not Found & State Desync Issues

for taskflow, this work
```python
@task(queue="xxx")
```

GitHub link: 
https://github.com/apache/airflow/discussions/61468#discussioncomment-15706221


This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]



Re: [D] Airflow 3.1.5 Mixed Executors - Task Instance Not Found & State Desync Issues [airflow]

2026-02-05 Thread via GitHub


GitHub user benbenbang added a comment to the discussion: Airflow 3.1.5 Mixed 
Executors - Task Instance Not Found & State Desync Issues

for taskflow, this work
```python
queue="xxx"
```

GitHub link: 
https://github.com/apache/airflow/discussions/61468#discussioncomment-15706221


This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]



Re: [D] Airflow 3.1.5 Mixed Executors - Task Instance Not Found & State Desync Issues [airflow]

2026-02-04 Thread via GitHub


GitHub user benbenbang edited a discussion: Airflow 3.1.5 Mixed Executors - 
Task Instance Not Found & State Desync Issues

Hi folks 👋,

We're experiencing critical issues with Airflow `3.1.5` using mixed executors 
(`KubernetesExecutor`,
`CeleryExecutor`) that prevent `CeleryExecutor` tasks from running 
successfully. This is blocking our
migration from Airflow `2.x` to `3.x`.

## Environment

- Airflow Version: `3.1.5`
- Executor Config: `AIRFLOW__CORE__EXECUTOR=KubernetesExecutor,CeleryExecutor`
- Deployment: `Kubernetes (EKS)`
- Celery Broker: `AWS SQS (FIFO queues)`
- Celery Result Backend: `PostgreSQL`
- Database: `PostgreSQL (RDS)`
- Usages:
- `CeleryExecutor`: `dbt` jobs (high-concurrency, short-duration, 
SLA-critical)
- `KubernetesExecutor`: Resource-intensive jobs requiring custom pod specs
- ConfigMap:
   ```yaml
AIRFLOW__CELERY__BROKER_URL=sqs://
AIRFLOW__CELERY__OPERATION_TIMEOUT=30
AIRFLOW__CELERY__FLOWER_PORT=
AIRFLOW__CELERY_KUBERNETES_EXECUTOR__KUBERNETES_QUEUE=kubernetes
AIRFLOW__CELERY__CELERY_CONFIG_OPTIONS=celery_config.CELERY_CONFIG
AIRFLOW__CELERY__SSL_ACTIVE=false
AIRFLOW__CELERY__FLOWER_HOST=0.0.0.0
AIRFLOW__CELERY__SYNC_PARALLELISM=0
   ```
- Celery Config:
  ```json
  {
"accept_content": [
"json"
],
"event_serializer": "json",
"worker_prefetch_multiplier": 1,
"task_acks_late": true,
"task_default_queue": "default",
"task_default_exchange": "default",
"task_track_started": true,
"broker_url": "sqs://",
"broker_transport_options": {
"visibility_timeout": 32000,
"region_name": "eu-west-1",
"region": "eu-west-1",
"predefined_queues": {
"vc-sqs-celery-worker.fifo": {
"url": 
"https://sqs.eu-west-1.amazonaws.com/xxx/vc-sqs-celery-worker.fifo";
}
}
},
"broker_connection_retry_on_startup": true,
"result_backend": "postgresql",
"database_engine_options": {},
"worker_concurrency": 16,
"worker_enable_remote_control": true,
"worker_redirect_stdouts": false,
"worker_hijack_root_logger": false
  }
  ```


## Configuration

DAG-level executor specification:
```yaml
# config.yaml
dag:
  default_args:
executor: CeleryExecutor
pool: pool_dbt_hourly_p0
retries: 2
start_date: "2020-11-30 00:00:00"
```

Celery queue configuration:
```python
# celery_config.py
# merge with from airflow.providers.celery.executors.default_celery import 
DEFAULT_CELERY_CONFIG
# becosmes CELERY_CONFIG
 
EXTRA_CELERY_CONFIG = {
"broker_transport_options": {
"visibility_timeout": 32000,
"region_name": "eu-west-1",
"predefined_queues": {
f"vc-sqs-celery-worker.fifo": {
"url":
f"https://sqs.eu-west-1.amazonaws.com/xxx/vc-sqs-celery-worker.fifo";,
},
},
},
}
```

Queue passed to tasks:
```python
# DAG code
queue = dag_config.DEFAULT_CELERY_QUEUE  # "vc-sqs-celery-worker.fifo"
task = DbtBaseOperator(
task_id="run_model",
queue=queue,
# ... other params
)
```

### Evidence of correct configuration (by importing dag and checking in the 
pod):
```python
In [1]: dag.task_dict
Out[1]:
{'start': ,
 'run_product': ,
 'test_product': ,
 'snapshot_product': ,
 'end': }

In [2]: dag.task_dict["run_product"].queue
Out[2]: 'vc-sqs-celery-worker.fifo'

In [3]: dag.task_dict["run_product"].executor
Out[3]: 'CeleryExecutor'
```

Tasks are correctly configured with both `executor` and `queue` parameters.

## The Issues

### Issue 1: Task Instance Not Found (CRITICAL - Blocks all CeleryExecutor 
tasks)

**Tasks with `executor: CeleryExecutor` consistently fail with "Task Instance 
not found" errors,
preventing any task execution.**

Celery Worker Logs:
```
2026-02-04T15:17:15.096Z [info] [9e301d75...] Executing workload in Celery:
  ti=TaskInstance(id=UUID('019c293a-994b-7abe-839f-4bcb7c1b2e1f'),
  task_id='run_product', dag_id='dbt_hourly_p0_product',
  run_id='scheduled__2026-02-04T14:30:00+00:00', try_number=4,
  queue='vc-sqs-celery-worker.fifo', ...)

2026-02-04T15:17:15.565Z [info] Secrets backends loaded for worker

[15-30 seconds of DAG code loading, Variable API calls]

2026-02-04T15:17:45.678Z [info] Process exited exit_code=-9 signal_sent=SIGKILL

2026-02-04T15:17:45.692Z [error] Task execute_workload[9e301d75...] raised 
unexpected:
  ServerResponseError('Server returned error')

  Traceback:
File "airflow/sdk/execution_time/supervisor.py", line 966, in 
_on_child_started
  ti_context = self.client.task_instances.start(ti.id, self.pid, start_date)
  airflow.sdk.api.client.ServerResponseError: Server returned error
  Correlation-id=019c293b-1c2a-7cdd-9b47-294f34d0a7c5

2026-02-04T15:17:48.482Z [error] Server indicated the task shouldn't be running 
anymore
  [supervisor] detail={'detail': {'reason': 'not_found',
  'message': 'Task Instance not found'}} status_code=404
```

Scheduler Logs:
```
2026

[D] Airflow 3.1.5 Mixed Executors - Task Instance Not Found & State Desync Issues [airflow]

2026-02-04 Thread via GitHub


GitHub user benbenbang created a discussion: Airflow 3.1.5 Mixed Executors - 
Task Instance Not Found & State Desync Issues

Hi folks 👋,

We're experiencing critical issues with Airflow `3.1.5` using mixed executors 
(`KubernetesExecutor`,
`CeleryExecutor`) that prevent `CeleryExecutor` tasks from running 
successfully. This is blocking our
migration from Airflow `2.x` to `3.x`.

## Environment

- Airflow Version: `3.1.5`
- Executor Config: `AIRFLOW__CORE__EXECUTOR=KubernetesExecutor,CeleryExecutor`
- Deployment: `Kubernetes (EKS)`
- Celery Broker: `AWS SQS (FIFO queues)`
- Celery Result Backend: `PostgreSQL`
- Database: `PostgreSQL (RDS)`
- Usages:
- `CeleryExecutor`: `dbt` jobs (high-concurrency, short-duration, 
SLA-critical)
- `KubernetesExecutor`: Resource-intensive jobs requiring custom pod specs
- ConfigMap:
   ```yaml
AIRFLOW__CELERY__BROKER_URL=sqs://
AIRFLOW__CELERY__OPERATION_TIMEOUT=30
AIRFLOW__CELERY__FLOWER_PORT=
AIRFLOW__CELERY_KUBERNETES_EXECUTOR__KUBERNETES_QUEUE=kubernetes
AIRFLOW__CELERY__CELERY_CONFIG_OPTIONS=celery_config.CELERY_CONFIG
AIRFLOW__CELERY__SSL_ACTIVE=false
AIRFLOW__CELERY__FLOWER_HOST=0.0.0.0
AIRFLOW__CELERY__SYNC_PARALLELISM=0
   ```
- Celery Config:
  ```json
  {
"accept_content": [
"json"
],
"event_serializer": "json",
"worker_prefetch_multiplier": 1,
"task_acks_late": true,
"task_default_queue": "default",
"task_default_exchange": "default",
"task_track_started": true,
"broker_url": "sqs://",
"broker_transport_options": {
"visibility_timeout": 32000,
"region_name": "eu-west-1",
"region": "eu-west-1",
"predefined_queues": {
"vc-sqs-celery-worker.fifo": {
"url": 
"https://sqs.eu-west-1.amazonaws.com/xxx/vc-sqs-celery-worker.fifo";
}
}
},
"broker_connection_retry_on_startup": true,
"result_backend": "postgresql",
"database_engine_options": {},
"worker_concurrency": 16,
"worker_enable_remote_control": true,
"worker_redirect_stdouts": false,
"worker_hijack_root_logger": false
  }
  ```


## Configuration

DAG-level executor specification:
```yaml
# config.yaml
dag:
  default_args:
executor: CeleryExecutor
pool: pool_dbt_hourly_p0
retries: 2
start_date: "2020-11-30 00:00:00"
```

Celery queue configuration:
```python
# celery_config.py
# merge with from airflow.providers.celery.executors.default_celery import 
DEFAULT_CELERY_CONFIG
# becosmes CELERY_CONFIG
 
EXTRA_CELERY_CONFIG = {
"broker_transport_options": {
"visibility_timeout": 32000,
"region_name": "eu-west-1",
"predefined_queues": {
f"vc-sqs-celery-worker.fifo": {
"url":
f"https://sqs.eu-west-1.amazonaws.com/xxx/vc-sqs-celery-worker.fifo";,
},
},
},
}
```

Queue passed to tasks:
```python
# DAG code
queue = dag_config.DEFAULT_CELERY_QUEUE  # "vc-sqs-celery-worker.fifo"
task = DbtBaseOperator(
task_id="run_model",
queue=queue,
# ... other params
)
```

### Evidence of correct configuration (by importing dag and checking in the 
pod):
```python
In [1]: dag.task_dict
Out[1]:
{'start': ,
 'run_product': ,
 'test_product': ,
 'snapshot_product': ,
 'end': }

In [2]: dag.task_dict["run_product"].queue
Out[2]: 'vc-sqs-celery-worker.fifo'

In [3]: dag.task_dict["run_product"].executor
Out[3]: 'CeleryExecutor'
```

Tasks are correctly configured with both `executor` and `queue` parameters.

## The Issues

### Issue 1: Task Instance Not Found (CRITICAL - Blocks all CeleryExecutor 
tasks)

**Tasks with `executor: CeleryExecutor` consistently fail with "Task Instance 
not found" errors,
preventing any task execution.**

Celery Worker Logs:
```
2026-02-04T15:17:15.096Z [info] [9e301d75...] Executing workload in Celery:
  ti=TaskInstance(id=UUID('019c293a-994b-7abe-839f-4bcb7c1b2e1f'),
  task_id='run_product', dag_id='dbt_hourly_p0_product',
  run_id='scheduled__2026-02-04T14:30:00+00:00', try_number=4,
  queue='vc-sqs-celery-worker.fifo', ...)

2026-02-04T15:17:15.565Z [info] Secrets backends loaded for worker

[15-30 seconds of DAG code loading, Variable API calls]

2026-02-04T15:17:45.678Z [info] Process exited exit_code=-9 signal_sent=SIGKILL

2026-02-04T15:17:45.692Z [error] Task execute_workload[9e301d75...] raised 
unexpected:
  ServerResponseError('Server returned error')

  Traceback:
File "airflow/sdk/execution_time/supervisor.py", line 966, in 
_on_child_started
  ti_context = self.client.task_instances.start(ti.id, self.pid, start_date)
  airflow.sdk.api.client.ServerResponseError: Server returned error
  Correlation-id=019c293b-1c2a-7cdd-9b47-294f34d0a7c5

2026-02-04T15:17:48.482Z [error] Server indicated the task shouldn't be running 
anymore
  [supervisor] detail={'detail': {'reason': 'not_found',
  'message': 'Task Instance not found'}} status_code=404
```

Scheduler Logs:
```
202