seanmuth opened a new issue, #67123:
URL: https://github.com/apache/airflow/issues/67123

   ## Apache Airflow Provider(s)
   
   - `apache-airflow-providers-celery` (regression introduced in 3.16.0, 
present through current)
   
   ## Versions of Apache Airflow Providers
   
   - `apache-airflow-providers-celery==3.16.0` (introducing release) through 
**`3.19.0` (current)** and `main`. Verified the per-publish 
`create_celery_app(_conf)` call in `send_workload_to_executor` is unchanged 
across all releases since 3.16.0.
   - One partial optimization has landed since: `get_celery_configuration()` is 
now `@cache`-decorated, so the config *dict* is built once. The `Celery()` app 
instance itself is still constructed fresh on every publish, so `_backend` is 
still `None` at the start of every `apply_async` call and the `entry_points()` 
scan still runs per publish. The mechanics of the regression are identical to 
3.16.0.
   - `apache-airflow-providers-cncf-kubernetes` is referenced as a 
counterexample.
   
   ## Apache Airflow version
   
   Reproduced on **Airflow 2.11.0** (Astro Runtime 13.4.0). The relevant code 
path is present on `main` as well.
   
   ## What happened?
   
   Since providers-celery 3.16.0 (PR #60675 — *"AIP-67 - Multi Team: Update 
Celery Executor to support multi team"*), `send_task_to_executor` constructs a 
fresh `Celery()` app on every task publish:
   
   ```python
   # providers/celery/.../celery_executor_utils.py
   def send_task_to_executor(task_tuple):
       key, args, queue, team_name = task_tuple
       ...
       celery_app = create_celery_app(_conf)   # NEW: fresh Celery() per publish
       task_to_run = celery_app.tasks["execute_workload"]
       ...
       with timeout(seconds=OPERATION_TIMEOUT):
           result = task_to_run.apply_async(args=args, queue=queue)
   ```
   
   The PR's own commit message acknowledges this explicitly:
   
   > Since sending tasks is parallelized with multiple processes (which do not 
share memory with the parent) the send task logic now re-creates a celery app 
in the sub processes (since the pickling and unpickling that python does to try 
pass state to the sub processes was not reliably creating the correct celery 
app objects).
   
   Each `Celery()` instance starts with `_backend = None`. The first thing 
`apply_async` does is `app.send_task()`, which accesses `self.backend`, which 
lazy-resolves via `_get_backend()` → `backends.by_url()` → 
`backends.by_name()`, which in turn calls 
**`load_extension_class_names("celery.result_backends")`**, which runs 
`importlib.metadata.entry_points()` — walking every installed distribution to 
read its `entry_points.txt` and merge plugin-registered backends into the alias 
map. **The scan is run unconditionally; setting `[celery] result_backend` 
explicitly does not skip it.**
   
   Pre-3.16.0, `app = _get_celery_app()` was a module-level singleton. The lazy 
backend resolution happened once per subprocess lifetime, then was cached on 
the `Celery` instance. Post-3.16.0, the cache is thrown away every publish, so 
the scan runs once per task enqueued.
   
   This scan's cost scales linearly with the count of installed distributions 
on disk. On realistic production images (~600+ `*.dist-info` entries), each 
publish now pays a multi-tens-of-milliseconds tax on the quiet path. Under 
scheduler load and memory pressure, the tail extends past the default 1.0s 
`operation_timeout`, causing publishes to fail with `AirflowTaskTimeout`, 
exhaust the configured retry budget (default 3), and end up logged as:
   
   ```
   {celery_executor.py:174} INFO - [Try 1 of 3] Task Timeout Error for Task: ...
   {celery_executor.py:174} INFO - [Try 2 of 3] Task Timeout Error for Task: ...
   {celery_executor.py:174} INFO - [Try 3 of 3] Task Timeout Error for Task: ...
   {celery_executor.py:213} ERROR - Error sending Celery task: Timeout, PID: NNN
   ```
   
   Retries hit the same slow path inside the same subprocess (the new 
`Celery()` instance's `_backend` is still `None`), so retrying does not recover.
   
   ## Quantitative impact
   
   Measured on a controlled astro-runtime 13.4.0 deployment matching a real 
production deployment's scheduler resources (1 vCPU, 2 GiB) and roughly its 
`*.dist-info` count (656 distributions). Each iteration mirrors the per-publish 
path: `create_celery_app(conf)` + `_ = app.backend`. Wall-clock latency in 
milliseconds, n=100 per row:
   
   | Configuration | min | p50 | p95 | p99 | max | iter/s |
   |---|---:|---:|---:|---:|---:|---:|
   | Idle, no load | 48.1 | 51.1 | 67.0 | 82.0 | 86.4 | 18.8 |
   | Idle + 1.5 GiB anon allocation | 48.2 | 52.2 | 88.8 | 94.9 | 100.4 | 17.5 |
   | 5 modest DAGs with mapped tasks running | 52.2 | 88.6 | 232.0 | 362.7 | 
385.3 | 9.6 |
   | Same DAG load + 1.0 GiB anon allocation | **51.0** | **111.8** | **498.3** 
| **697.9** | **717.8** | **5.7** |
   
   For comparison: a real production deployment with this regression (~682 
distributions, ~640 active DAGs, periodic OOMs) shows `max=558ms` and 
consistent `Task Timeout Error` failures at the default 1.0s 
`operation_timeout`. The controlled reproduction *exceeds* that worst case 
using just 5 DAGs, demonstrating that the regression's impact is multiplicative 
across three independent factors:
   
   - Installed distribution count (baseline scan cost)
   - Scheduler activity / CPU contention with the publisher subprocess
   - Memory pressure causing `entry_points.txt` page-cache eviction
   
   In the field, all three compound simultaneously. The 1.0s 
`operation_timeout` default — appropriate for the pre-regression path where 
backend resolution was amortized — is no longer safe at typical production 
deployment sizes.
   
   ## Counterexample: KubernetesExecutor's AIP-67 implementation does not have 
this problem
   
   PR #61798 implemented the same AIP-67 multi-team feature for 
`KubernetesExecutor`. The pattern is cleaner:
   
   ```python
   # providers/cncf/kubernetes/.../executors/kubernetes_executor.py
   def __init__(self, *args, **kwargs):
       super().__init__(*args, **kwargs)
       ...
       self.kube_config = KubeConfig(executor_conf=self.conf)   # once per 
executor
       ...
   ```
   
   `KubeConfig` is parameterized on the team-aware `executor_conf` but 
constructed *once*, at executor init, and held on the instance. No per-publish 
reconstruction.
   
   The KubernetesExecutor publishes via a long-lived multiprocessing worker 
reading from a managed queue. The worker process holds its Kubernetes client 
across publishes. By contrast, the CeleryExecutor's 
`ProcessPoolExecutor.submit(send_task_to_executor, ...)` per-publish design 
forced the AIP-67 author to push the team-aware app construction *inside* the 
per-call function rather than ahead of it — and the pickling concern cited in 
the commit message was specifically about passing app state through the pool 
boundary, not about needing fresh apps per task semantically.
   
   In other words, the per-publish reconstruction is **not an inherent 
requirement of multi-team support**. It's an artifact of the way the celery 
publish pool was already shaped. A subprocess-local cache keyed on `team_name` 
would satisfy the multi-team correctness requirement without re-paying the 
backend-resolution cost on every publish.
   
   ## Proposed fix
   
   Cache the constructed `Celery` app at module level inside the publisher 
subprocess, keyed on team:
   
   ```python
   # providers/celery/.../celery_executor_utils.py
   from functools import lru_cache
   
   @lru_cache(maxsize=8)   # one app per active team in a given subprocess
   def _cached_celery_app_for_team(team_name: str | None) -> Celery:
       """
       Subprocess-local Celery app cache keyed on team_name.
   
       Subprocesses don't share memory with the parent, so this cache is 
per-subprocess.
       Within a subprocess, calls for the same team reuse the same Celery() 
instance,
       preserving the cached backend resolution that pre-3.16.0 enjoyed 
module-globally.
       """
       if AIRFLOW_V_3_0_PLUS:
           from airflow.executors.base_executor import ExecutorConf
           _conf = ExecutorConf(team_name)
       else:
           _conf = conf
       return create_celery_app(_conf)
   
   
   def send_task_to_executor(task_tuple):
       key, args, queue, team_name = task_tuple
       celery_app = _cached_celery_app_for_team(team_name)   # cached, not 
reconstructed
       if AIRFLOW_V_3_0_PLUS:
           task_to_run = celery_app.tasks["execute_workload"]
           ...
       else:
           task_to_run = celery_app.tasks["execute_command"]
           ...
       with timeout(seconds=OPERATION_TIMEOUT):
           result = task_to_run.apply_async(args=args, queue=queue)
   ```
   
   This preserves AIP-67's multi-team correctness (apps are still team-specific 
where needed) while restoring per-subprocess amortization for the 
`_get_backend()` → `entry_points()` cost. The fix is local to one function, 
requires no API changes, and aligns the CeleryExecutor's pattern with how 
KubernetesExecutor handled the same AIP-67 requirement.
   
   The `maxsize=8` is a guess at a reasonable per-subprocess team-count ceiling 
and should be configurable, defaulting to something modest. For single-team 
deployments (the vast majority), `team_name` is `None` and a single cache entry 
covers all publishes.
   
   ## How to reproduce
   
   A complete reproduction setup follows. (Happy to share the project skeleton 
if useful — leaving inline so the steps are self-contained.) Summary:
   
   1. Astro Runtime 13.4.0 (Airflow 2.11.0), 1 vCPU / 2 GiB scheduler.
   2. `providers-celery==3.17.2`, `celery==5.6.2`.
   3. Inflate the image's distribution count to ~656 via `requirements.txt` (a 
wide set of unused providers and utility packages — count is what matters, not 
which ones).
   4. Five synthetic DAGs with `*/1 * * * *` and `*/2 * * * *` schedules and 
mapped tasks producing ~380 task enqueues/minute.
   5. Inside the scheduler pod, run a 100-iteration timing script measuring 
`create_celery_app(conf); _ = app.backend`. Then layer in a `bytearray(1024 * 
1024 * 1024)` allocation in a second shell. Observe `max` cross 700ms; observe 
`Task Timeout Error for Task` log entries firing.
   
   Downgrade `providers-celery` to 3.15.x in the same image; the same 
measurement collapses to roughly the idle baseline regardless of load + 
pressure, because the entry-points scan is amortized over subprocess lifetime.
   
   ## Operating system
   
   Reproduction on `linux/amd64` (astro-runtime base, Debian-based). Not 
OS-specific; the regression is in Python-level code.
   
   ## Deployment
   
   Other 3rd-party Helm chart / managed deployment (Astronomer Hosted 
Execution). Same root cause is present in any deployment of Airflow 2.11.0+ or 
3.x with `apache-airflow-providers-celery >= 3.16.0`.
   
   ## Anything else?
   
   - Workarounds that *do not* help: setting `[celery] result_backend` 
explicitly (the entry-points scan runs unconditionally inside `by_name`); 
pinning to `celery==5.5.x` (same code path on Python 3.11+); removing the 
`importlib_metadata` backport (it's a transitive `apache-airflow` dependency).
   - Workarounds that *do* help, in priority: bump `[celery] operation_timeout` 
to a generous value (e.g. 2-3s) — this just stops the cliff failure, the 
per-publish cost remains; downgrade to `providers-celery==3.15.x` if multi-team 
is not required.
   - Disabling `AIRFLOW__CORE__MULTI_TEAM` (or running on Airflow 2.x where it 
doesn't exist) does **not** mitigate, because the per-publish 
`create_celery_app` is unconditional and not gated on whether multi-team is 
active.
   - `AIRFLOW__CORE__LAZY_LOAD_PLUGINS` has no effect — the scan walks 
installed distributions on disk, not loaded modules.
   - The OOM symptom (memory pressure feedback loop) often shows up alongside 
this regression in deployments with tight scheduler memory limits; addressing 
the per-publish cost reduces orphan-adoption-burst load on restart and 
indirectly helps the OOM picture.
   
   ## Are you willing to submit a PR?
   
   - [x] Yes
   
   ## Code of Conduct
   
   - [x] I agree to follow this project's Code of Conduct
   
   ---
   Drafted-by: Claude Code (Opus 4.7); reviewed by @seanmuth before posting
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to