This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 977935d14b4 airflow-ctl-tests: wait for Dag run terminal state before 
xcom commands (#67065)
977935d14b4 is described below

commit 977935d14b4e8a2d094b7edbc2563b9d56641058
Author: Jarek Potiuk <[email protected]>
AuthorDate: Sun May 17 21:51:02 2026 +0200

    airflow-ctl-tests: wait for Dag run terminal state before xcom commands 
(#67065)
    
    The airflowctl xcom integration tests are flaky on ARM CI: xcom add plus
    xcom get and xcom list succeed, then xcom edit and xcom delete fail with
    "The XCom with key ... doesn't exist". Same SHA passes on some ARM runs
    and fails on others.
    
    Root cause is a race between the test's xcom add and execution of
    runme_0. When the task transitions to RUNNING, the execution API
    queries every XCom key for that task instance and tells the worker to
    clear them via xcom_keys_to_clear (see
    airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
    and task-sdk/src/airflow/sdk/execution_time/task_runner.py). If the
    xcom add lands before the task starts running -- which only takes a
    few seconds for the bash echo task in example_bash_operator -- the
    user-added XCom is wiped right before xcom edit / xcom delete fires.
    
    Wait for the targeted Dag run to reach a terminal state before each
    xcom command. Once the task is terminal it won't run again, so the
    clear logic doesn't fire and the user-added XCom survives the rest of
    the xcom commands.
---
 .../tests/airflowctl_tests/conftest.py             | 72 +++++++++++++++++++++-
 1 file changed, 69 insertions(+), 3 deletions(-)

diff --git a/airflow-ctl-tests/tests/airflowctl_tests/conftest.py 
b/airflow-ctl-tests/tests/airflowctl_tests/conftest.py
index cbcea44ec36..31a3696a161 100644
--- a/airflow-ctl-tests/tests/airflowctl_tests/conftest.py
+++ b/airflow-ctl-tests/tests/airflowctl_tests/conftest.py
@@ -16,9 +16,13 @@
 # under the License.
 from __future__ import annotations
 
+import json
 import os
+import re
 import subprocess
 import sys
+import time
+from subprocess import PIPE, STDOUT, Popen
 
 import pytest
 import requests
@@ -35,6 +39,65 @@ from airflowctl_tests.constants import (
 
 from tests_common.test_utils.fernet import generate_fernet_key_string
 
+# XCom add/edit/delete race against task execution: when the target task 
transitions to
+# RUNNING, the execution API tells the worker to clear every XCom key 
currently stored
+# for that task instance (see `xcom_keys_to_clear` in
+# 
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py). 
Any
+# XCom the test just added through airflowctl is wiped, and the next xcom 
edit/delete
+# command then fails with "XCom doesn't exist". Waiting for the Dag run to 
reach a
+# terminal state means the task has already run (and won't run again), so 
user-added
+# XComs survive the rest of the xcom commands.
+_XCOM_TARGET_PATTERN = 
re.compile(r'^xcom\s+(?:add|get|list|edit|delete)\s+(\S+)\s+"(manual__[^"]+)"')
+_DAG_RUN_TERMINAL_STATES = frozenset({"success", "failed"})
+
+
+def _airflowctl_dag_run_state(dag_id: str, dag_run_id: str, env_vars: dict, 
skip_login: bool) -> str | None:
+    """Return the current state of a Dag run via airflowctl, or None if 
unparsable."""
+    host_envs = os.environ.copy()
+    host_envs.update(env_vars)
+
+    get_cmd = f'airflowctl dagrun get {dag_id} "{dag_run_id}" -o json'
+    if not skip_login:
+        get_cmd = f"airflowctl {LOGIN_COMMAND} && {get_cmd}"
+
+    proc = Popen(get_cmd.encode(), stdout=PIPE, stderr=STDOUT, shell=True, 
env=host_envs)
+    try:
+        out, _ = proc.communicate(timeout=20)
+    except subprocess.TimeoutExpired:
+        proc.kill()
+        return None
+    out_str = out.decode()
+    if LOGIN_OUTPUT in out_str:
+        out_str = out_str.split(f"{LOGIN_OUTPUT}\n", 1)[-1].strip()
+    start, end = out_str.find("{"), out_str.rfind("}")
+    if start == -1 or end == -1:
+        return None
+    try:
+        return json.loads(out_str[start : end + 1]).get("state")
+    except json.JSONDecodeError:
+        return None
+
+
+def _wait_for_dag_run_terminal_state(
+    dag_id: str,
+    dag_run_id: str,
+    env_vars: dict,
+    skip_login: bool,
+    timeout: int = 60,
+) -> None:
+    """Block until the Dag run reaches success/failed, or raise 
TimeoutError."""
+    deadline = time.monotonic() + timeout
+    last_state: str | None = None
+    while time.monotonic() < deadline:
+        last_state = _airflowctl_dag_run_state(dag_id, dag_run_id, env_vars, 
skip_login)
+        if last_state in _DAG_RUN_TERMINAL_STATES:
+            return
+        time.sleep(1)
+    raise TimeoutError(
+        f"Dag run {dag_id}/{dag_run_id} did not reach terminal state in 
{timeout}s "
+        f"(last seen state: {last_state})"
+    )
+
 
 @pytest.fixture(scope="module")
 def api_token():
@@ -56,9 +119,6 @@ def run_command():
     """Fixture that provides a helper to run airflowctl commands."""
 
     def _run_command(command: str, env_vars: dict, skip_login: bool = False) 
-> str:
-        import os
-        from subprocess import PIPE, STDOUT, Popen
-
         host_envs = os.environ.copy()
         host_envs.update(env_vars)
 
@@ -70,6 +130,12 @@ def run_command():
         else:
             run_cmd = command_from_config
 
+        # See `_XCOM_TARGET_PATTERN` above for why xcom commands have to wait 
for the
+        # Dag run to be terminal before running.
+        xcom_match = _XCOM_TARGET_PATTERN.match(command)
+        if xcom_match:
+            _wait_for_dag_run_terminal_state(xcom_match.group(1), 
xcom_match.group(2), env_vars, skip_login)
+
         console.print(f"[yellow]Running command: {command}")
 
         # Give some time for the command to execute and output to be ready

Reply via email to