arkadiuszbach opened a new pull request, #68905:
URL: https://github.com/apache/airflow/pull/68905

   ## What
   
   `create_celery_app()` is now cached by the resolved (team-aware) app name, 
so a
   Celery app is built once per process and reused on subsequent sends instead 
of
   being rebuilt on every workload send.
   
   ## Why / Root cause
   
   `CeleryExecutor._send_workloads_to_celery()` has two paths:
   
   - **Parallel path** — sends are fanned out across a `ProcessPoolExecutor`. 
Each
     subprocess builds its own Celery app and is then torn down, so a fresh app 
per
     send is harmless.
   - **In-process hot path** — when there is a single task per heartbeat, or
     `sync_parallelism == 1`, `send_workload_to_executor()` runs **directly in 
the
     long-lived scheduler/executor process**.
   
   `send_workload_to_executor()` calls `create_celery_app()` on every 
invocation.
   On the in-process path this meant a brand-new Celery app (with new broker /
   result-backend connection pools) was constructed for **every single send** 
and
   never released — a steady memory leak in a long-running process.
   
   PR #60675 introduced the team-aware app construction under the assumption 
that
   sends always happen in throwaway subprocesses. That assumption does not hold 
for
   the in-process path.
   
   ## Fix
   
   - Add a module-level `_celery_app_cache: dict[str, Celery]` keyed by the
     resolved (team-suffixed) app name.
   - In `create_celery_app()`, return the cached app if one exists for that 
name,
     otherwise build it and store it before returning.
   
   Celery config is static per app-name per process, so reuse is safe for both 
call
   sites:
   - Subprocesses still build their app once (cache is per-process).
   - The in-process path stops recreating the app on every send, eliminating 
the leak.
   
   ## Testing
   
   - `test_app_is_cached_and_reused` — `create_celery_app(conf)` returns the 
same
     instance on repeated calls.
   - `test_distinct_teams_get_distinct_apps` (3.2+) — different teams get 
distinct
     apps, same team reuses one app.
   - A `_clear_celery_app_cache` autouse fixture isolates the cache within
     `TestCreateCeleryAppTeamIsolation`, and `TestMultiTeamCeleryExecutor`
     setup/teardown clear it too (the cache, being keyed by app name, otherwise
     leaks built apps across tests). Both clears are required — verified 
empirically:
     removing them makes `test_team_specific_broker_not_overwritten` fail.
   
   ## Notes
   
   The regression tests fail when the production cache change is reverted.
   
   ## Reproducing
   <details>
     <summary>details</summary>
   
     - apache-airflow-providers-celery==3.20.0, CeleryExecutor, 
[sync_parallelism](https://airflow.apache.org/docs/apache-airflow-providers-celery/stable/configurations-ref.html#sync-parallelism)=1
     - test_dag
       ``` 
       def test(**context):
           time.sleep(1)
     
       dag = DAG(
           dag_id='test',
           schedule=timedelta(minutes=1),
           start_date=datetime(2026, 6, 1),
           catchup=False,
           max_active_runs=1,
       )
       
       with dag:
           # 50 tasks on Celery worker
           for i in range(50):
               PythonOperator(
                   task_id=f"celery_{i}",
                   python_callable=test,
               )
       
           # 50 tasks on Triggerer (deferrable async sleep)
           for i in range(50):
               WaitSensor(
                   task_id=f"triggerer_{i}",
                   time_to_wait=timedelta(seconds=1),
                   deferrable=True,
               )
       ```
      - let it run for some time, it leaks
   
   ---
   
   ##### Was generative AI tooling used to co-author this PR?
   - [X] Yes
   Generated-by: [Claude Opus 4.8] following [the 
guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions)
   <!--
   
   -->
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to