potiuk opened a new pull request, #66602:
URL: https://github.com/apache/airflow/pull/66602
## Summary
`providers/celery/tests/integration/celery/test_celery_executor.py::TestCeleryExecutor::test_celery_integration`
has been failing intermittently with `httpx.ConnectError: [Errno 111]
Connection refused`. The test is supposed to short-circuit task execution with
a fake `execute_workload`, but the worker was sometimes invoking the real one,
which calls the Execution API and fails when no API server is running.
## Root cause
`_prepare_app` registers the fake task on a fresh `test_app` via:
```python
test_execute = test_app.task(name=execute_name)(execute)
```
`@app.task(...)` defaults to `shared=True`, which appends a `cons` callback
to celery's **process-global** `celery._state._on_app_finalizers` set — for
both the *real* `execute_workload` (decorated at module import) and the *fake*
registered here.
When `start_worker(app=test_app)` calls `test_app.finalize()`, celery
iterates that set and runs each `cons(test_app)`:
```python
def _announce_app_finalized(app):
callbacks = set(_on_app_finalizers)
for callback in callbacks:
callback(app)
```
Set iteration order is **hash-based and non-deterministic**. Each `cons`
calls `test_app._task_from_fun(fn, name=\"execute_workload\")`, which
short-circuits if the name is already in `_tasks`:
```python
if name not in self._tasks:
... create ...
else:
task = self._tasks[name] # keeps existing, doesn't replace
```
So whichever finalizer fires first wins. If the real one wins, the worker
invokes the real `execute_workload` → `BaseExecutor.run_workload` →
`supervise_task` → tries to connect to the Execution API at `localhost:8080` →
`Connection refused`. The test asserts the `success` task ended in
`State.SUCCESS`, gets `State.FAILED`. `@pytest.mark.flaky(reruns=5)` doesn't
help because the cons-registration outcome is consistent within a process
(function ids hash the same).
The trace from the failed runs shows the log line from
`celery_executor_utils.py:221` (inside the *real* `execute_workload`),
confirming the fake never won.
## Fix
Force the fake to win deterministically:
1. Finalize `test_app` explicitly. The real finalizer registers the real
task in `test_app._tasks`.
2. `pop` that entry.
3. Register the fake via `test_app.task(name=..., shared=False)(execute)`.
With `shared=False` the fake doesn't add itself to `_on_app_finalizers`, so it
stays scoped to `test_app`. With the app already finalized, `lazy=True`
short-circuits to eager registration, and the resulting `Task` object is what
`mock.patch.object(celery_executor_utils, execute_name, test_execute)` patches
in.
Also drop the `celery_executor_utils.execute_workload.__wrapped__ = execute`
lines — celery dispatches tasks via `Task.run`, not `__wrapped__`, so those
reassignments weren't redirecting anything.
## Tests
The change touches only the test setup. The existing
`test_celery_integration` parametrizations are what verify the fix — they now
reliably hit the fake `execute_workload` regardless of finalizer iteration
order.
Reproduces the kind of failure seen in
https://github.com/apache/airflow/actions/runs/25548565094/job/74993414446 (and
similar recent flaky runs of this test).
---
##### Was generative AI tooling used to co-author this PR?
- [X] Yes — Claude Code (Opus 4.7)
Generated-by: Claude Code (Opus 4.7) following [the
guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]