This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 84270cbc234 Fix Java SDK tasks rejected by coordinator 
connection-ownership check (#68147)
84270cbc234 is described below

commit 84270cbc2348cd635c9d19573b15969dfdb8dbd4
Author: Jarek Potiuk <[email protected]>
AuthorDate: Sun Jun 7 10:26:24 2026 +0200

    Fix Java SDK tasks rejected by coordinator connection-ownership check 
(#68147)
    
    The TCP connection-ownership verification added in #67781 only accepted
    the supervisor channel when the connecting peer belonged to the spawned
    process's exact PID. In the real Java SDK PROD e2e, the JVM's loopback
    connection is not found under that single PID, so both the `comm` and
    `logs` channels are rejected, the task subprocess dies with "process
    exited with 1 before connecting", and every Java task fails. The Java SDK
    e2e suite is canary-only, so it did not run on #67781 and the breakage
    only surfaced in the nightly main runs.
    
    Trust the connecting peer when it belongs to the child process *or any of
    its descendants* — a launcher (JVM, shell wrapper, or any runtime that
    forks a worker) legitimately connects back from a descendant rather than
    the launched PID itself. A process outside the spawned subtree is still
    rejected, so the security property #67781 added is preserved. The lookup
    is also retried briefly to absorb the race where a freshly established
    connection is not yet visible in `/proc`.
---
 .../src/airflow/sdk/coordinators/_subprocess.py    | 63 ++++++++++++++---
 .../tests/task_sdk/coordinators/test_subprocess.py | 78 +++++++++++++++++++++-
 2 files changed, 129 insertions(+), 12 deletions(-)

diff --git a/task-sdk/src/airflow/sdk/coordinators/_subprocess.py 
b/task-sdk/src/airflow/sdk/coordinators/_subprocess.py
index c223243eb6f..89dfe5f71d7 100644
--- a/task-sdk/src/airflow/sdk/coordinators/_subprocess.py
+++ b/task-sdk/src/airflow/sdk/coordinators/_subprocess.py
@@ -73,22 +73,63 @@ def _socket_address(value: tuple | str) -> tuple[str, int] 
| None:
     return str(host), int(port)
 
 
-def _is_connection_from_process(conn: socket.socket, proc: subprocess.Popen) 
-> bool:
-    """Return whether the accepted TCP connection belongs to the child 
process."""
+def _connection_owned_by_process_tree(
+    peer: tuple[str, int], local: tuple[str, int], proc: subprocess.Popen
+) -> bool:
+    """
+    Return whether ``peer`` <-> ``local`` is an established connection in the 
child's process tree.
+
+    The launched child may itself spawn the process that connects back to the
+    supervisor — a JVM launcher, a shell wrapper, or any runtime that forks a
+    worker — so the connecting peer can legitimately belong to a *descendant* 
of
+    ``proc.pid`` rather than ``proc.pid`` itself. Every process in the subtree
+    rooted at ``proc.pid`` is part of the task and is trusted; a process 
outside
+    that subtree (e.g. an unrelated local process racing for the port) is not.
+    """
+    try:
+        root = psutil.Process(proc.pid)
+        processes = [root, *root.children(recursive=True)]
+    except (psutil.AccessDenied, psutil.NoSuchProcess, psutil.ZombieProcess, 
OSError):
+        return False
+    for process in processes:
+        try:
+            connections = process.net_connections(kind="tcp")
+        except (psutil.AccessDenied, psutil.NoSuchProcess, 
psutil.ZombieProcess, OSError):
+            # A descendant may exit between enumeration and inspection — skip 
it
+            # rather than failing verification for the whole tree.
+            continue
+        for connection in connections:
+            if _socket_address(connection.laddr) == peer and 
_socket_address(connection.raddr) == local:
+                return True
+    return False
+
+
+def _is_connection_from_process(
+    conn: socket.socket,
+    proc: subprocess.Popen,
+    *,
+    verify_timeout: float = 1.0,
+    poll_interval: float = 0.05,
+) -> bool:
+    """
+    Return whether the accepted TCP connection originates from the child 
process tree.
+
+    The connection is trusted only if it belongs to ``proc.pid`` or one of its
+    descendants. A freshly established connection is not always visible in
+    ``/proc`` the instant it is accepted, so the lookup is retried for up to
+    *verify_timeout* seconds before the connection is rejected.
+    """
     peer = _socket_address(conn.getpeername())
     local = _socket_address(conn.getsockname())
     if peer is None or local is None:
         return False
-    try:
-        process = psutil.Process(proc.pid)
-        connections = process.net_connections(kind="tcp")
-    except (psutil.AccessDenied, psutil.NoSuchProcess, psutil.ZombieProcess, 
OSError):
-        log.warning("Unable to verify child process connection", pid=proc.pid, 
exc_info=True)
-        return False
-    for connection in connections:
-        if _socket_address(connection.laddr) == peer and 
_socket_address(connection.raddr) == local:
+    deadline = time.monotonic() + verify_timeout
+    while True:
+        if _connection_owned_by_process_tree(peer, local, proc):
             return True
-    return False
+        if time.monotonic() >= deadline:
+            return False
+        time.sleep(poll_interval)
 
 
 def _accept_connections(
diff --git a/task-sdk/tests/task_sdk/coordinators/test_subprocess.py 
b/task-sdk/tests/task_sdk/coordinators/test_subprocess.py
index 95a41e6282b..8f91e729532 100644
--- a/task-sdk/tests/task_sdk/coordinators/test_subprocess.py
+++ b/task-sdk/tests/task_sdk/coordinators/test_subprocess.py
@@ -27,6 +27,7 @@ import time
 from unittest.mock import ANY, MagicMock, call, patch
 
 import attrs
+import psutil
 import pytest
 from uuid6 import uuid7
 
@@ -34,6 +35,7 @@ from airflow.sdk.api.client import Client, 
TaskInstanceOperations
 from airflow.sdk.coordinators._subprocess import (
     SubprocessCoordinator,
     _accept_connections,
+    _connection_owned_by_process_tree,
     _is_connection_from_process,
     _PopenActivitySubprocess,
     _ResourceTracker,
@@ -350,13 +352,87 @@ class TestConnectionFromProcess:
 
         try:
             with patch("airflow.sdk.coordinators._subprocess.psutil.Process") 
as mock_process:
+                mock_process.return_value.children.return_value = []
                 mock_process.return_value.net_connections.return_value = []
-                assert _is_connection_from_process(conn, mock_proc) is False
+                assert _is_connection_from_process(conn, mock_proc, 
verify_timeout=0.0) is False
         finally:
             conn.close()
             client.close()
             server.close()
 
+    def test_matches_descendant_process_tcp_connection(self):
+        """A connection owned by a *descendant* of the child process is 
accepted.
+
+        Regression test for the Java coordinator (#67781): the launched process
+        may itself spawn the runtime that connects back, so the peer can belong
+        to a descendant of ``proc.pid`` rather than ``proc.pid`` directly.
+        """
+        server = _start_server()
+        host, port = server.getsockname()
+        # A real subprocess — a descendant of this test process — opens the 
connection.
+        connector = subprocess.Popen(
+            [
+                sys.executable,
+                "-c",
+                "import socket, sys, time; s = socket.socket(); "
+                "s.connect((sys.argv[1], int(sys.argv[2]))); time.sleep(30)",
+                host,
+                str(port),
+            ],
+        )
+        mock_proc = MagicMock(spec=subprocess.Popen)
+        mock_proc.pid = os.getpid()  # connector is a descendant of this 
process
+
+        try:
+            conn, _ = server.accept()
+            try:
+                assert _is_connection_from_process(conn, mock_proc) is True
+            finally:
+                conn.close()
+        finally:
+            connector.terminate()
+            connector.wait(timeout=5)
+            server.close()
+
+    def test_retries_until_ownership_is_confirmed(self):
+        """The lookup is retried while the connection is not yet visible in 
/proc."""
+        conn = MagicMock()
+        conn.getpeername.return_value = ("127.0.0.1", 5000)
+        conn.getsockname.return_value = ("127.0.0.1", 6000)
+        mock_proc = MagicMock(spec=subprocess.Popen)
+        mock_proc.pid = 999
+
+        with patch(
+            
"airflow.sdk.coordinators._subprocess._connection_owned_by_process_tree",
+            side_effect=[False, False, True],
+        ) as mock_owned:
+            assert _is_connection_from_process(conn, mock_proc, 
poll_interval=0.0) is True
+        assert mock_owned.call_count == 3
+
+    def test_rejects_when_ownership_never_confirmed(self):
+        conn = MagicMock()
+        conn.getpeername.return_value = ("127.0.0.1", 5000)
+        conn.getsockname.return_value = ("127.0.0.1", 6000)
+        mock_proc = MagicMock(spec=subprocess.Popen)
+        mock_proc.pid = 999
+
+        with patch(
+            
"airflow.sdk.coordinators._subprocess._connection_owned_by_process_tree",
+            return_value=False,
+        ):
+            assert (
+                _is_connection_from_process(conn, mock_proc, 
verify_timeout=0.0, poll_interval=0.0) is False
+            )
+
+    def test_owned_by_tree_returns_false_when_process_gone(self):
+        mock_proc = MagicMock(spec=subprocess.Popen)
+        mock_proc.pid = 999999
+        with patch(
+            "airflow.sdk.coordinators._subprocess.psutil.Process",
+            side_effect=psutil.NoSuchProcess(999999),
+        ):
+            assert _connection_owned_by_process_tree(("127.0.0.1", 1), 
("127.0.0.1", 2), mock_proc) is False
+
 
 class TestResourceTracker:
     """

Reply via email to