arkadiuszbach opened a new pull request, #68905:
URL: https://github.com/apache/airflow/pull/68905
## What
`create_celery_app()` is now cached by the resolved (team-aware) app name,
so a
Celery app is built once per process and reused on subsequent sends instead
of
being rebuilt on every workload send.
## Why / Root cause
`CeleryExecutor._send_workloads_to_celery()` has two paths:
- **Parallel path** — sends are fanned out across a `ProcessPoolExecutor`.
Each
subprocess builds its own Celery app and is then torn down, so a fresh app
per
send is harmless.
- **In-process hot path** — when there is a single task per heartbeat, or
`sync_parallelism == 1`, `send_workload_to_executor()` runs **directly in
the
long-lived scheduler/executor process**.
`send_workload_to_executor()` calls `create_celery_app()` on every
invocation.
On the in-process path this meant a brand-new Celery app (with new broker /
result-backend connection pools) was constructed for **every single send**
and
never released — a steady memory leak in a long-running process.
PR #60675 introduced the team-aware app construction under the assumption
that
sends always happen in throwaway subprocesses. That assumption does not hold
for
the in-process path.
## Fix
- Add a module-level `_celery_app_cache: dict[str, Celery]` keyed by the
resolved (team-suffixed) app name.
- In `create_celery_app()`, return the cached app if one exists for that
name,
otherwise build it and store it before returning.
Celery config is static per app-name per process, so reuse is safe for both
call
sites:
- Subprocesses still build their app once (cache is per-process).
- The in-process path stops recreating the app on every send, eliminating
the leak.
## Testing
- `test_app_is_cached_and_reused` — `create_celery_app(conf)` returns the
same
instance on repeated calls.
- `test_distinct_teams_get_distinct_apps` (3.2+) — different teams get
distinct
apps, same team reuses one app.
- A `_clear_celery_app_cache` autouse fixture isolates the cache within
`TestCreateCeleryAppTeamIsolation`, and `TestMultiTeamCeleryExecutor`
setup/teardown clear it too (the cache, being keyed by app name, otherwise
leaks built apps across tests). Both clears are required — verified
empirically:
removing them makes `test_team_specific_broker_not_overwritten` fail.
## Notes
The regression tests fail when the production cache change is reverted.
## Reproducing
<details>
<summary>details</summary>
- apache-airflow-providers-celery==3.20.0, CeleryExecutor,
[sync_parallelism](https://airflow.apache.org/docs/apache-airflow-providers-celery/stable/configurations-ref.html#sync-parallelism)=1
- test_dag
```
def test(**context):
time.sleep(1)
dag = DAG(
dag_id='test',
schedule=timedelta(minutes=1),
start_date=datetime(2026, 6, 1),
catchup=False,
max_active_runs=1,
)
with dag:
# 50 tasks on Celery worker
for i in range(50):
PythonOperator(
task_id=f"celery_{i}",
python_callable=test,
)
# 50 tasks on Triggerer (deferrable async sleep)
for i in range(50):
WaitSensor(
task_id=f"triggerer_{i}",
time_to_wait=timedelta(seconds=1),
deferrable=True,
)
```
- let it run for some time, it leaks
---
##### Was generative AI tooling used to co-author this PR?
- [X] Yes
Generated-by: [Claude Opus 4.8] following [the
guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions)
<!--
-->
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]