This is an automated email from the ASF dual-hosted git repository.

vatsrahul1001 pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-2-test by this push:
     new 839835052a8 Fix flaky test_celery_integration with deterministic task 
registration (#66602) (#66916)
839835052a8 is described below

commit 839835052a85429e8771f882ca250ec59e110431
Author: Rahul Vats <[email protected]>
AuthorDate: Fri May 15 10:10:30 2026 +0530

    Fix flaky test_celery_integration with deterministic task registration 
(#66602) (#66916)
    
    The test patched a fake execute_workload via test_app.task(name=...)(fake)
    to short-circuit the real one (which would call the Execution API and fail
    with Connection refused, since no API server is running in this setup).
    But @app.task defaults to shared=True, which adds a finalizer to celery's
    process-global _on_app_finalizers set. start_worker triggers 
test_app.finalize(),
    which iterates that set in non-deterministic (hash-based) order. 
_task_from_fun
    keeps the existing entry if the task name is already registered, so 
whichever
    finalizer fires first wins -- making the test pass or fail at random.
    
    Force the fake to win: finalize the app first (real finalizer registers the
    real task), then evict that entry and register the fake explicitly with
    shared=False so it stays out of the global finalizer set.
    
    Also drop the dead __wrapped__ = execute lines -- celery dispatches via
    Task.run, not __wrapped__, so those weren't redirecting anything.
    
    (cherry picked from commit 9575fb3bf9b9c636abb4619c2012029e3119a08d)
    
    Co-authored-by: Jarek Potiuk <[email protected]>
---
 .../integration/celery/test_celery_executor.py     | 23 ++++++++++++++--------
 1 file changed, 15 insertions(+), 8 deletions(-)

diff --git a/providers/celery/tests/integration/celery/test_celery_executor.py 
b/providers/celery/tests/integration/celery/test_celery_executor.py
index da8ee15571b..eca326e810d 100644
--- a/providers/celery/tests/integration/celery/test_celery_executor.py
+++ b/providers/celery/tests/integration/celery/test_celery_executor.py
@@ -85,16 +85,23 @@ def _prepare_app(broker_url=None, execute=None):
     test_config = dict(celery_executor_utils.get_celery_configuration())
     test_config.update({"broker_url": broker_url})
     test_app = Celery(broker_url, config_source=test_config)
-    # Register the fake execute function with the test_app using the correct 
task name
-    # This ensures workers using test_app will execute the fake function
-    test_execute = test_app.task(name=execute_name)(execute)
+    # Register the fake execute function on test_app under the same task name 
as the real
+    # `execute_workload`. The real task uses `@app.task(...)` (shared=True by 
default),
+    # which adds a finalizer to celery's process-global `_on_app_finalizers` 
set. When
+    # `start_worker(app=test_app)` calls `test_app.finalize()`, celery 
iterates that set in
+    # non-deterministic (hash-based) order and calls `_task_from_fun` for each 
— and
+    # `_task_from_fun` keeps the existing entry if the task name is already 
registered,
+    # so whichever finalizer fires first wins. If the real one wins, the 
worker invokes
+    # the real `execute_workload`, which calls the Execution API at 
localhost:8080 and
+    # fails with `Connection refused` since no API server is running in this 
test setup.
+    # To make the fake win deterministically: finalize the app first (real 
finalizer
+    # registers the real task), then evict that entry and register the fake 
explicitly
+    # with `shared=False` so it stays out of the global finalizer set.
+    test_app.finalize()
+    test_app._tasks.pop(execute_name, None)
+    test_execute = test_app.task(name=execute_name, shared=False)(execute)
     patch_app = mock.patch.object(celery_executor_utils, "app", test_app)
 
-    if AIRFLOW_V_3_0_PLUS:
-        celery_executor_utils.execute_workload.__wrapped__ = execute
-    else:
-        celery_executor_utils.execute_command.__wrapped__ = execute
-
     patch_execute = mock.patch.object(celery_executor_utils, execute_name, 
test_execute)
     # Patch factory function so CeleryExecutor instances get the test app
     patch_factory = mock.patch.object(celery_executor_utils, 
"create_celery_app", return_value=test_app)

Reply via email to