This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 95fc11ed217 Ignore redelivered message for already-running task 
(#64052)
95fc11ed217 is described below

commit 95fc11ed217b51e5f256cc0a4d8a389784aec81e
Author: Anish Giri <[email protected]>
AuthorDate: Thu Mar 26 14:52:29 2026 -0500

    Ignore redelivered message for already-running task (#64052)
    
    Catch TaskAlreadyRunningError from the supervisor and raise Celery
    Ignore() to prevent the broker redelivery from being recorded as a
    task failure.
    
    related: #58441
---
 .../src/tests_common/test_utils/version_compat.py  |  1 +
 .../celery/executors/celery_executor_utils.py      | 52 +++++++++++++-------
 .../src/airflow/providers/celery/version_compat.py |  3 +-
 .../unit/celery/executors/test_celery_executor.py  | 55 +++++++++++++++++++++-
 4 files changed, 92 insertions(+), 19 deletions(-)

diff --git a/devel-common/src/tests_common/test_utils/version_compat.py 
b/devel-common/src/tests_common/test_utils/version_compat.py
index 41326e95153..7921f025296 100644
--- a/devel-common/src/tests_common/test_utils/version_compat.py
+++ b/devel-common/src/tests_common/test_utils/version_compat.py
@@ -38,6 +38,7 @@ AIRFLOW_V_3_0_3_PLUS = get_base_airflow_version_tuple() >= 
(3, 0, 3)
 AIRFLOW_V_3_1_PLUS = get_base_airflow_version_tuple() >= (3, 1, 0)
 AIRFLOW_V_3_1_3_PLUS = get_base_airflow_version_tuple() >= (3, 1, 3)
 AIRFLOW_V_3_1_7_PLUS = get_base_airflow_version_tuple() >= (3, 1, 7)
+AIRFLOW_V_3_1_9_PLUS = get_base_airflow_version_tuple() >= (3, 1, 9)
 AIRFLOW_V_3_2_PLUS = get_base_airflow_version_tuple() >= (3, 2, 0)
 
 if AIRFLOW_V_3_1_PLUS:
diff --git 
a/providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py
 
b/providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py
index 768f2d9dbf6..6ac9ce19029 100644
--- 
a/providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py
+++ 
b/providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py
@@ -42,7 +42,11 @@ from celery.signals import import_modules as 
celery_import_modules, worker_ready
 from sqlalchemy import select
 
 from airflow.executors.base_executor import BaseExecutor
-from airflow.providers.celery.version_compat import AIRFLOW_V_3_0_PLUS, 
AIRFLOW_V_3_2_PLUS
+from airflow.providers.celery.version_compat import (
+    AIRFLOW_V_3_0_PLUS,
+    AIRFLOW_V_3_1_9_PLUS,
+    AIRFLOW_V_3_2_PLUS,
+)
 from airflow.providers.common.compat.sdk import AirflowException, 
AirflowTaskTimeout, Stats, conf, timeout
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.net import get_hostname
@@ -189,6 +193,7 @@ def on_celery_worker_ready(*args, **kwargs):
 # and deserialization for us
 @app.task(name="execute_workload")
 def execute_workload(input: str) -> None:
+    from celery.exceptions import Ignore
     from pydantic import TypeAdapter
 
     from airflow.executors import workloads
@@ -208,22 +213,35 @@ def execute_workload(input: str) -> None:
         base_url = f"http://localhost:8080{base_url}";
     default_execution_api_server = f"{base_url.rstrip('/')}/execution/"
 
-    if isinstance(workload, workloads.ExecuteTask):
-        supervise(
-            # This is the "wrong" ti type, but it duck types the same. TODO: 
Create a protocol for this.
-            ti=workload.ti,  # type: ignore[arg-type]
-            dag_rel_path=workload.dag_rel_path,
-            bundle_info=workload.bundle_info,
-            token=workload.token,
-            server=conf.get("core", "execution_api_server_url", 
fallback=default_execution_api_server),
-            log_path=workload.log_path,
-        )
-    elif isinstance(workload, workloads.ExecuteCallback):
-        success, error_msg = execute_callback_workload(workload.callback, log)
-        if not success:
-            raise RuntimeError(error_msg or "Callback execution failed")
-    else:
-        raise ValueError(f"CeleryExecutor does not know how to handle 
{type(workload)}")
+    try:
+        if isinstance(workload, workloads.ExecuteTask):
+            supervise(
+                # This is the "wrong" ti type, but it duck types the same. 
TODO: Create a protocol for this.
+                ti=workload.ti,  # type: ignore[arg-type]
+                dag_rel_path=workload.dag_rel_path,
+                bundle_info=workload.bundle_info,
+                token=workload.token,
+                server=conf.get("core", "execution_api_server_url", 
fallback=default_execution_api_server),
+                log_path=workload.log_path,
+            )
+        elif isinstance(workload, workloads.ExecuteCallback):
+            success, error_msg = execute_callback_workload(workload.callback, 
log)
+            if not success:
+                raise RuntimeError(error_msg or "Callback execution failed")
+        else:
+            raise ValueError(f"CeleryExecutor does not know how to handle 
{type(workload)}")
+    except Exception as e:
+        if AIRFLOW_V_3_1_9_PLUS:
+            from airflow.sdk.exceptions import TaskAlreadyRunningError
+
+            if isinstance(e, TaskAlreadyRunningError):
+                log.info("[%s] Task already running elsewhere, ignoring 
redelivered message", celery_task_id)
+                # Raise Ignore() so Celery does not record a FAILURE result 
for this duplicate
+                # delivery. Without this, the broker redelivering the message 
(e.g. after a
+                # visibility timeout) would cause Celery to mark the task as 
failed, even though
+                # the original worker is still executing it successfully.
+                raise Ignore()
+        raise
 
 
 if not AIRFLOW_V_3_0_PLUS:
diff --git a/providers/celery/src/airflow/providers/celery/version_compat.py 
b/providers/celery/src/airflow/providers/celery/version_compat.py
index 0b65e14199e..6d0c6107451 100644
--- a/providers/celery/src/airflow/providers/celery/version_compat.py
+++ b/providers/celery/src/airflow/providers/celery/version_compat.py
@@ -27,6 +27,7 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
 
 
 AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
+AIRFLOW_V_3_1_9_PLUS = get_base_airflow_version_tuple() >= (3, 1, 9)
 AIRFLOW_V_3_2_PLUS = get_base_airflow_version_tuple() >= (3, 2, 0)
 
-__all__ = ["AIRFLOW_V_3_0_PLUS", "AIRFLOW_V_3_2_PLUS"]
+__all__ = ["AIRFLOW_V_3_0_PLUS", "AIRFLOW_V_3_1_9_PLUS", "AIRFLOW_V_3_2_PLUS"]
diff --git 
a/providers/celery/tests/unit/celery/executors/test_celery_executor.py 
b/providers/celery/tests/unit/celery/executors/test_celery_executor.py
index 27328a78d06..ff2c146f828 100644
--- a/providers/celery/tests/unit/celery/executors/test_celery_executor.py
+++ b/providers/celery/tests/unit/celery/executors/test_celery_executor.py
@@ -45,7 +45,12 @@ from tests_common.test_utils import db
 from tests_common.test_utils.config import conf_vars
 from tests_common.test_utils.dag import sync_dag_to_db
 from tests_common.test_utils.taskinstance import create_task_instance
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, 
AIRFLOW_V_3_1_PLUS, AIRFLOW_V_3_2_PLUS
+from tests_common.test_utils.version_compat import (
+    AIRFLOW_V_3_0_PLUS,
+    AIRFLOW_V_3_1_9_PLUS,
+    AIRFLOW_V_3_1_PLUS,
+    AIRFLOW_V_3_2_PLUS,
+)
 
 if AIRFLOW_V_3_0_PLUS:
     from airflow.models.dag_version import DagVersion
@@ -761,3 +766,51 @@ def test_celery_tasks_registered_on_import():
         assert "execute_command" in registered_tasks, (
             "execute_command must be registered for Airflow 2.x compatibility."
         )
+
+
[email protected](not AIRFLOW_V_3_1_9_PLUS, reason="TaskAlreadyRunningError 
requires Airflow 3.1.9+")
+def test_execute_workload_ignores_already_running_task():
+    """Test that execute_workload raises Celery Ignore when task is already 
running."""
+    import importlib
+
+    from celery.exceptions import Ignore
+
+    from airflow.sdk.exceptions import TaskAlreadyRunningError
+
+    importlib.reload(celery_executor_utils)
+    execute_workload_unwrapped = 
celery_executor_utils.execute_workload.__wrapped__
+
+    mock_current_task = mock.MagicMock()
+    mock_current_task.request.id = "test-celery-task-id"
+    mock_app = mock.MagicMock()
+    mock_app.current_task = mock_current_task
+
+    with (
+        mock.patch("airflow.sdk.execution_time.supervisor.supervise") as 
mock_supervise,
+        mock.patch.object(celery_executor_utils, "app", mock_app),
+    ):
+        mock_supervise.side_effect = TaskAlreadyRunningError("Task already 
running")
+
+        workload_json = """
+        {
+            "type": "ExecuteTask",
+            "token": "test-token",
+            "dag_rel_path": "test_dag.py",
+            "bundle_info": {"name": "test-bundle", "version": null},
+            "log_path": "test.log",
+            "ti": {
+                "id": "019bdec0-d353-7b68-abe0-5ac20fa75ad0",
+                "dag_version_id": "019bdead-fdcd-78ab-a9f2-aba3b80fded2",
+                "task_id": "test_task",
+                "dag_id": "test_dag",
+                "run_id": "test_run",
+                "try_number": 1,
+                "map_index": -1,
+                "pool_slots": 1,
+                "queue": "default",
+                "priority_weight": 1
+            }
+        }
+        """
+        with pytest.raises(Ignore):
+            execute_workload_unwrapped(workload_json)

Reply via email to