This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9b1d58aedb2 Allow using fresh interpreter besides fork() in Edge 
Worker (#65943)
9b1d58aedb2 is described below

commit 9b1d58aedb2a055a9575297ce3691279cbb76a95
Author: Diogo Silva <[email protected]>
AuthorDate: Thu May 14 17:12:18 2026 +0100

    Allow using fresh interpreter besides fork() in Edge Worker (#65943)
    
    * fix(edge3): replace fork() with subprocess.Popen to prevent deadlocks in 
multi-threaded workers
    
    The edge worker process runs 22+ threads (asyncio event loop,
    ThreadPoolExecutor, HTTP clients). When `_launch_job()` used
    `multiprocessing.Process` (fork start method), `os.fork()` copied
    locked import locks from other threads into the child. Since only the
    forking thread survives, those locks are never released — causing
    permanent deadlocks on any subsequent import in the child process.
    
    A non-deadlock variant also occurs where the child inherits corrupted
    `sys.modules` state, causing `ModuleNotFoundError` cascades for all
    plugin and DAG imports.
    
    This commit replaces the `multiprocessing.Process` fork with
    `subprocess.Popen` launching a fresh Python interpreter via the
    existing `airflow.sdk.execution_time.execute_workload` CLI entrypoint.
    The `ExecuteTask` workload is already a Pydantic model with
    `model_dump_json()` — the same serialization path used by the ECS
    executor and the edge executor's own DB storage.
    
    Changes:
    - `worker.py`: Replace `_launch_job` to use `subprocess.Popen` with
      `execute_workload --json-string`. Remove `_run_job_via_supervisor`,
      `_reset_parent_signal_state`, `multiprocessing` imports, and the
      `results_queue` plumbing.
    - `dataclasses.py`: Change `Job.process` type from
      `multiprocessing.Process` to `subprocess.Popen`. Update `is_running`
      to use `poll()` and `is_success` to check `returncode`.
    - `test_worker.py`: Update mocks and assertions to match the new
      subprocess-based approach.
    
    Fixes: #65942
    
    * Fix test_worker.py to use multiprocessing.Process instead of 
subprocess.Popen
    
    * Honor fresh interpreter mode in Edge worker
    
    * Clarify Edge worker task process handling
    
    * Improve Edge worker subprocess failure handling
    
    * fix: rollback supervise changes & fix tests related to display_name
    
    * Unify error transport for fork and subprocess paths via temp file
    
    Remove the multiprocessing.Queue from the fork execution path and use a
    plain temp file for both fork and subprocess paths. Both paths now write
    failure text to a NamedTemporaryFile (Path stored as Job.stderr_file_path);
    the parent reads it after the child exits via Job.failure_details() and
    pushes the content to the task log via logs_push.
    
    Benefits over the Queue approach:
    - No risk of buffer deadlock (the Queue deadlock was the original issue)
    - Works identically for both fork and subprocess children
    - Simpler: no IPC setup, no draining loop, no Queue import
    - Error file is only created/filled on failure; task logs cover the
      success path
    
    Also extract _make_task_temp_file() helper to avoid duplicating the
    NamedTemporaryFile creation pattern across _launch_job_subprocess and
    _launch_job_fork.
---
 .../src/airflow/providers/edge3/cli/dataclasses.py |  28 +-
 .../src/airflow/providers/edge3/cli/worker.py      | 121 +++++--
 .../edge3/tests/unit/edge3/cli/test_worker.py      | 362 ++++++++++++++-------
 3 files changed, 373 insertions(+), 138 deletions(-)

diff --git a/providers/edge3/src/airflow/providers/edge3/cli/dataclasses.py 
b/providers/edge3/src/airflow/providers/edge3/cli/dataclasses.py
index 63e12f6f810..e3e3d3f09e6 100644
--- a/providers/edge3/src/airflow/providers/edge3/cli/dataclasses.py
+++ b/providers/edge3/src/airflow/providers/edge3/cli/dataclasses.py
@@ -17,6 +17,7 @@
 from __future__ import annotations
 
 import json
+import subprocess
 from dataclasses import asdict, dataclass
 from multiprocessing import Process
 from pathlib import Path
@@ -72,17 +73,42 @@ class Job:
     """Holds all information for a task/job to be executed as bundle."""
 
     edge_job: EdgeJobFetched
-    process: Process
+    process: subprocess.Popen | Process
+    """Can be subprocess.Popen (for the spawn path) or multiprocessing.Process 
(for the fork path)."""
     logfile: Path
     logsize: int = 0
     """Last size of log file, point of last chunk push."""
+    stderr_file_path: Path | None = None
+    """Path to file where error details are written on failure (stderr for 
subprocess path, traceback text for fork path)."""
 
     @property
     def is_running(self) -> bool:
         """Check if the job is still running."""
+        if isinstance(self.process, subprocess.Popen):
+            return self.process.poll() is None
         return self.process.is_alive()
 
     @property
     def is_success(self) -> bool:
         """Check if the job was successful."""
+        if isinstance(self.process, subprocess.Popen):
+            return self.process.returncode == 0
         return self.process.exitcode == 0
+
+    def failure_details(self) -> str:
+        """Format failure details, reading error text from the error file if 
available."""
+        error_output = ""
+        if self.stderr_file_path and self.stderr_file_path.exists():
+            error_output = 
self.stderr_file_path.read_bytes().decode(errors="backslashreplace").strip()
+        if isinstance(self.process, subprocess.Popen):
+            ex_txt = f"Task subprocess exited with code 
{self.process.returncode}"
+        else:
+            ex_txt = f"Task fork exited with code {self.process.exitcode}"
+        if error_output:
+            ex_txt = f"{ex_txt}\n{error_output}"
+        return ex_txt
+
+    def cleanup(self) -> None:
+        """Remove transient files owned by this job."""
+        if self.stderr_file_path:
+            self.stderr_file_path.unlink(missing_ok=True)
diff --git a/providers/edge3/src/airflow/providers/edge3/cli/worker.py 
b/providers/edge3/src/airflow/providers/edge3/cli/worker.py
index 778af0dc325..fec957f06eb 100644
--- a/providers/edge3/src/airflow/providers/edge3/cli/worker.py
+++ b/providers/edge3/src/airflow/providers/edge3/cli/worker.py
@@ -19,7 +19,9 @@ from __future__ import annotations
 import logging
 import os
 import signal
+import subprocess
 import sys
+import tempfile
 import time
 import traceback
 from asyncio import Task, create_task, gather, get_running_loop, sleep
@@ -28,9 +30,9 @@ from contextlib import suppress
 from datetime import datetime
 from functools import cached_property
 from http import HTTPStatus
-from multiprocessing import Process, Queue
+from multiprocessing import Process
 from pathlib import Path
-from typing import TYPE_CHECKING
+from typing import IO, TYPE_CHECKING
 
 import anyio
 from aiofiles import open as aio_open
@@ -66,6 +68,7 @@ from airflow.utils.state import TaskInstanceState
 if TYPE_CHECKING:
     from airflow.configuration import AirflowConfigParser
     from airflow.executors.workloads import ExecuteTask
+    from airflow.providers.edge3.worker_api.datamodels import EdgeJobFetched
 
 logger = logging.getLogger(__name__)
 
@@ -75,6 +78,12 @@ else:
     from setproctitle import setproctitle
 
 
+def _make_task_temp_file(prefix: str) -> tuple[IO[bytes], Path]:
+    """Create a named temporary file for task output capture and return the 
open file and its path."""
+    f = tempfile.NamedTemporaryFile(prefix=prefix, suffix=".log", delete=False)
+    return f, Path(f.name)
+
+
 def _edge_hostname() -> str:
     """Get the hostname of the edge worker that should be reported by tasks."""
     return os.environ.get("HOSTNAME", getfqdn())
@@ -413,7 +422,8 @@ class EdgeWorker:
             return EdgeWorkerState.MAINTENANCE_MODE
         return EdgeWorkerState.IDLE
 
-    def _run_job_via_supervisor(self, workload: ExecuteTask, results_queue: 
Queue) -> int:
+    def _run_job_via_supervisor(self, workload: ExecuteTask, error_file_path: 
Path) -> int:
+        """Run a task by calling the supervisor directly (executes inside a 
forked child process)."""
         _reset_parent_signal_state()
 
         # Ignore ctrl-c in this process -- we don't want to kill _this_ one. 
we let tasks run to completion
@@ -448,23 +458,89 @@ class EdgeWorker:
                     server=self._execution_api_server_url,
                     log_path=workload.log_path,
                 )
-            results_queue.put("OK")
             return 0
-        except Exception as e:
+        except Exception:
             logger.exception("Task execution failed")
-            results_queue.put(e)
+            with suppress(Exception):
+                error_file_path.write_text(traceback.format_exc())
             return 1
 
-    def _launch_job(self, workload: ExecuteTask) -> tuple[Process, 
Queue[Exception]]:
+    def _launch_job_subprocess(self, workload: ExecuteTask) -> 
tuple[subprocess.Popen, Path]:
+        """Launch workload via a fresh Python interpreter 
(subprocess.Popen)."""
+        env = os.environ.copy()
+        if self._execution_api_server_url:
+            env["AIRFLOW__CORE__EXECUTION_API_SERVER_URL"] = 
self._execution_api_server_url
+
+        # Keep stderr off a PIPE: the worker only inspects stderr after the 
task finishes,
+        # so a verbose child could otherwise fill the pipe buffer and block 
forever. Also keep
+        # it task-scoped instead of inheriting the worker's stderr/stdout; 
supervisor startup
+        # failures should be pushed to the task log, not only the 
worker/container log.
+        stderr_file, stderr_file_path = 
_make_task_temp_file("airflow-edge-task-stderr-")
+        try:
+            process = subprocess.Popen(
+                [
+                    sys.executable,
+                    "-m",
+                    "airflow.sdk.execution_time.execute_workload",
+                    "--json-string",
+                    workload.model_dump_json(),
+                ],
+                env=env,
+                start_new_session=True,
+                stderr=stderr_file,
+            )
+        except Exception:
+            stderr_file_path.unlink(missing_ok=True)
+            raise
+        finally:
+            # Close the parent's copy of the fd. Popen already dup2()'d it 
into the child,
+            # so the child's stderr remains open and writable. The parent 
reads the output
+            # later via stderr_file_path (the Path) once the child has exited.
+            stderr_file.close()
+        logger.info(
+            "Launched task subprocess pid=%d for %s",
+            process.pid,
+            workload.ti.id,
+        )
+        return process, stderr_file_path
+
+    def _launch_job_fork(self, workload: ExecuteTask) -> tuple[Process, Path]:
+        """Launch workload by forking the current process 
(multiprocessing.Process)."""
         # Improvement: Use frozen GC to prevent child process from copying 
unnecessary memory
         # See _spawn_workers_with_gc_freeze() in 
airflow-core/src/airflow/executors/local_executor.py
-        results_queue: Queue[Exception] = Queue()
+        error_file, error_file_path = 
_make_task_temp_file("airflow-edge-task-error-")
+        error_file.close()  # child writes to the file by path; parent only 
reads it after exit
         process = Process(
             target=self._run_job_via_supervisor,
-            kwargs={"workload": workload, "results_queue": results_queue},
+            kwargs={"workload": workload, "error_file_path": error_file_path},
         )
         process.start()
-        return process, results_queue
+        logger.info("Launched task fork pid=%d for %s", process.pid, 
workload.ti.id)
+        return process, error_file_path
+
+    def _launch_job(self, edge_job: EdgeJobFetched, workload: ExecuteTask, 
logfile: Path) -> Job:
+        """
+        Launch a task process.
+
+        Uses ``subprocess.Popen`` (fresh interpreter) when
+        ``core.execute_tasks_new_python_interpreter`` is ``True`` or when
+        ``os.fork`` is unavailable (e.g. Windows).  Falls back to
+        ``multiprocessing.Process`` (fork) otherwise — preserving the
+        original behaviour for existing deployments.
+        """
+        use_new_interpreter = not hasattr(os, "fork") or self.conf.getboolean(
+            "core",
+            "execute_tasks_new_python_interpreter",
+            fallback=False,
+        )
+        if use_new_interpreter:
+            # Fresh subprocess path: spawn a new Python interpreter; no shared 
memory with parent
+            # Technically safer and more robust, but with more overhead
+            subprocess_process, stderr_file_path = 
self._launch_job_subprocess(workload)
+            return Job(edge_job, subprocess_process, logfile, 
stderr_file_path=stderr_file_path)
+        # Fork path: clone the current process; child inherits parent memory
+        fork_process, error_file_path = self._launch_job_fork(workload)
+        return Job(edge_job, fork_process, logfile, 
stderr_file_path=error_file_path)
 
     async def _push_logs_in_chunks(self, job: Job):
         aio_logfile = anyio.Path(job.logfile)
@@ -589,11 +665,10 @@ class EdgeWorker:
         logger.info("Received job: %s", edge_job.identifier)
 
         workload: ExecuteTask = edge_job.command
-        process, results_queue = self._launch_job(workload)
         if TYPE_CHECKING:
             assert workload.log_path  # We need to assume this is defined in 
here
         logfile = Path(self.base_log_folder, workload.log_path)
-        job = Job(edge_job, process, logfile)
+        job = self._launch_job(edge_job, workload, logfile)
         self.jobs.append(job)
         await jobs_set_state(edge_job.key, TaskInstanceState.RUNNING)
 
@@ -603,39 +678,31 @@ class EdgeWorker:
             self.background_tasks.add(task)
             task.add_done_callback(self.background_tasks.discard)
 
-        while job.is_running and results_queue.empty():
+        while job.is_running:
             await self._push_logs_in_chunks(job)
             for _ in range(0, self.job_poll_interval * 10):
                 await sleep(0.1)
                 if not job.is_running:
                     break
         await self._push_logs_in_chunks(job)
-        supervisor_msg = (
-            "(Unknown error, no exception details available)"
-            if results_queue.empty()
-            else results_queue.get()
-        )
-        # Ensure that supervisor really ended after we grabbed results from 
queue
-        while True:
-            if not job.is_running:
-                break
-            await sleep(0.1)
 
         self.jobs.remove(job)
         if job.is_success:
             logger.info("Job completed: %s", job.edge_job.identifier)
             await jobs_set_state(job.edge_job.key, TaskInstanceState.SUCCESS)
         else:
-            if isinstance(supervisor_msg, Exception):
-                supervisor_msg = 
"\n".join(traceback.format_exception(supervisor_msg))
-            logger.error("Job failed: %s with:\n%s", job.edge_job.identifier, 
supervisor_msg)
+            ex_txt = job.failure_details()
+            logger.error("Job failed: %s with:\n%s", job.edge_job.identifier, 
ex_txt)
+
             # Push it upwards to logs for better diagnostic as well
             await logs_push(
                 task=job.edge_job.key,
                 log_chunk_time=timezone.utcnow(),
-                log_chunk_data=f"Error executing job:\n{supervisor_msg}",
+                log_chunk_data=f"Error executing job:\n{ex_txt}",
             )
             await jobs_set_state(job.edge_job.key, TaskInstanceState.FAILED)
+        # Cleanup temp files used for the job
+        job.cleanup()
 
     async def heartbeat(self, new_maintenance_comments: str | None = None) -> 
bool:
         """Report liveness state of worker to central site with stats."""
diff --git a/providers/edge3/tests/unit/edge3/cli/test_worker.py 
b/providers/edge3/tests/unit/edge3/cli/test_worker.py
index 0be1573353d..b50af2f7197 100644
--- a/providers/edge3/tests/unit/edge3/cli/test_worker.py
+++ b/providers/edge3/tests/unit/edge3/cli/test_worker.py
@@ -16,16 +16,17 @@
 # under the License.
 from __future__ import annotations
 
-import asyncio
 import contextlib
 import importlib
 import json
 import logging
-import multiprocessing
+import os
 import signal
+import subprocess
+import sys
 from datetime import datetime
 from io import StringIO
-from multiprocessing import Process, Queue
+from multiprocessing import Process
 from pathlib import Path
 from unittest import mock
 from unittest.mock import call, patch
@@ -52,6 +53,7 @@ from airflow.providers.edge3.worker_api.datamodels import (
     WorkerRegistrationReturn,
     WorkerSetStateReturn,
 )
+from airflow.utils.state import TaskInstanceState
 
 from tests_common.test_utils.config import conf_vars
 from tests_common.test_utils.version_compat import AIRFLOW_V_3_2_PLUS, 
AIRFLOW_V_3_3_PLUS
@@ -81,21 +83,9 @@ MOCK_COMMAND = {
 }
 
 
-def _emit_large_exception_target(results_queue):
-    """Worker-process target used by 
``test_fetch_and_run_job_possible_deadlock``.
-
-    Pushes a >64 KB pickled exception to ``results_queue``. On Linux the OS 
pipe
-    backing ``multiprocessing.Queue`` only has ~64 KB of buffer, so the queue's
-    feeder thread blocks on ``send_bytes`` and the subprocess can't terminate
-    until the parent reads from the queue — exactly the production deadlock
-    condition that #66144 fixed.
-    """
-    results_queue.put(Exception(f"Task execution failed with large error 
message {'-' * 66000}"))
-
-
 class _MockProcess(Process):
     def __init__(self, returncode=None):
-        self.generated_returncode = None
+        self.generated_returncode = returncode
         self._is_alive = False
 
     def poll(self):
@@ -105,6 +95,10 @@ class _MockProcess(Process):
     def returncode(self):
         return self.generated_returncode
 
+    @property
+    def exitcode(self):
+        return self.generated_returncode
+
     def is_alive(self):
         return self._is_alive
 
@@ -118,6 +112,15 @@ class _MockProcess(Process):
         pass
 
 
+class _MockPopen(subprocess.Popen):
+    def __init__(self, returncode: int | None = None, pid: int = 1234):
+        self.returncode = returncode
+        self.pid = pid
+
+    def poll(self):
+        return self.returncode
+
+
 class TestEdgeWorker:
     @pytest.fixture(autouse=True)
     def setup_parser(self):
@@ -234,6 +237,98 @@ class TestEdgeWorker:
             url = test_worker._execution_api_server_url
             assert url == expected_url
 
+    @pytest.mark.parametrize(
+        ("has_fork", "use_new_interpreter", "expected_launch_method"),
+        [
+            pytest.param(True, False, "fork", 
id="fork_available_config_false"),
+            pytest.param(True, True, "subprocess", 
id="fork_available_config_true"),
+            pytest.param(False, False, "subprocess", 
id="fork_unavailable_config_false"),
+            pytest.param(False, True, "subprocess", 
id="fork_unavailable_config_true"),
+        ],
+    )
+    def test_launch_job_honors_execute_tasks_new_python_interpreter(
+        self,
+        has_fork,
+        use_new_interpreter,
+        expected_launch_method,
+        monkeypatch,
+        tmp_path: Path,
+        worker_with_job: EdgeWorker,
+    ):
+        if not has_fork:
+            monkeypatch.delattr(os, "fork", raising=False)
+        worker_with_job.conf = mock.MagicMock()
+        worker_with_job.conf.getboolean.return_value = use_new_interpreter
+        edge_job = worker_with_job.jobs[0].edge_job
+        workload = edge_job.command
+        logfile = tmp_path / "mock.log"
+        subprocess_process = _MockPopen(returncode=None)
+        stderr_file_path = tmp_path / "stderr.log"
+        fork_process = _MockProcess()
+        error_file_path = tmp_path / "fork-error.log"
+
+        with (
+            patch.object(
+                worker_with_job, "_launch_job_subprocess", 
return_value=(subprocess_process, stderr_file_path)
+            ) as mock_launch_subprocess,
+            patch.object(
+                worker_with_job, "_launch_job_fork", 
return_value=(fork_process, error_file_path)
+            ) as mock_launch_fork,
+        ):
+            job = worker_with_job._launch_job(edge_job, workload, logfile)
+
+        if has_fork:
+            worker_with_job.conf.getboolean.assert_called_once_with(
+                "core", "execute_tasks_new_python_interpreter", fallback=False
+            )
+        else:
+            worker_with_job.conf.getboolean.assert_not_called()
+        if expected_launch_method == "subprocess":
+            assert job.process is subprocess_process
+            assert job.stderr_file_path == stderr_file_path
+            mock_launch_subprocess.assert_called_once_with(workload)
+            mock_launch_fork.assert_not_called()
+        else:
+            assert job.process is fork_process
+            assert job.stderr_file_path == error_file_path
+            mock_launch_fork.assert_called_once_with(workload)
+            mock_launch_subprocess.assert_not_called()
+
+    @patch("airflow.providers.edge3.cli.worker.subprocess.Popen")
+    def test_launch_job_subprocess_uses_fresh_interpreter_and_spools_stderr(
+        self,
+        mock_popen,
+        worker_with_job: EdgeWorker,
+    ):
+        process = _MockPopen(returncode=None, pid=4321)
+        mock_popen.return_value = process
+        worker_with_job.__dict__["_execution_api_server_url"] = 
"https://mock-server/execution";
+        workload = worker_with_job.jobs[0].edge_job.command
+        stderr_file_path = None
+
+        try:
+            returned_process, stderr_file_path = 
worker_with_job._launch_job_subprocess(workload)
+            assert returned_process is process
+
+            popen_args, popen_kwargs = mock_popen.call_args
+            assert popen_args[0] == [
+                sys.executable,
+                "-m",
+                "airflow.sdk.execution_time.execute_workload",
+                "--json-string",
+                workload.model_dump_json(),
+            ]
+            assert (
+                popen_kwargs["env"]["AIRFLOW__CORE__EXECUTION_API_SERVER_URL"]
+                == "https://mock-server/execution";
+            )
+            assert popen_kwargs["start_new_session"] is True
+            assert popen_kwargs["stderr"] is not subprocess.PIPE
+            assert Path(popen_kwargs["stderr"].name) == stderr_file_path
+        finally:
+            if stderr_file_path:
+                stderr_file_path.unlink(missing_ok=True)
+
     @patch("airflow.sdk.execution_time.supervisor.supervise")
     @pytest.mark.skipif(AIRFLOW_V_3_3_PLUS, reason="Test is for Airflow < 
3.3.0 where supervise was used")
     @pytest.mark.asyncio
@@ -241,14 +336,15 @@ class TestEdgeWorker:
         self,
         mock_supervise,
         worker_with_job: EdgeWorker,
+        tmp_path: Path,
     ):
         worker_with_job.__dict__["_execution_api_server_url"] = 
"https://mock-server/execution";
         edge_job = worker_with_job.jobs.pop().edge_job
-        q = mock.MagicMock()
-        result = worker_with_job._run_job_via_supervisor(edge_job.command, q)
+        error_file_path = tmp_path / "fork-error.log"
+        result = worker_with_job._run_job_via_supervisor(edge_job.command, 
error_file_path)
 
         assert result == 0
-        q.put.assert_called_once_with("OK")
+        assert not error_file_path.exists()  # no error written on success
 
     @patch("airflow.executors.base_executor.BaseExecutor.run_workload")
     @pytest.mark.skipif(
@@ -259,32 +355,58 @@ class TestEdgeWorker:
         self,
         mock_run_workload,
         worker_with_job: EdgeWorker,
+        tmp_path: Path,
     ):
         worker_with_job.__dict__["_execution_api_server_url"] = 
"https://mock-server/execution";
         edge_job = worker_with_job.jobs.pop().edge_job
-        q = mock.MagicMock()
-        result = worker_with_job._run_job_via_supervisor(edge_job.command, q)
+        error_file_path = tmp_path / "fork-error.log"
+        result = worker_with_job._run_job_via_supervisor(edge_job.command, 
error_file_path)
 
         assert result == 0
-        q.put.assert_called_once_with("OK")
+        assert not error_file_path.exists()  # no error written on success
 
     @patch("airflow.sdk.execution_time.supervisor.supervise")
+    @pytest.mark.skipif(AIRFLOW_V_3_3_PLUS, reason="Test is for Airflow < 
3.3.0 where supervise was used")
     @pytest.mark.asyncio
-    async def test_supervise_launch_fail(
+    async def test_supervise_launch_fail_pre_3_3(
         self,
         mock_supervise,
         worker_with_job: EdgeWorker,
+        tmp_path: Path,
     ):
         mock_supervise.side_effect = Exception("Supervise failed")
+        worker_with_job.__dict__["_execution_api_server_url"] = 
"https://mock-server/execution";
+        edge_job = worker_with_job.jobs.pop().edge_job
+        error_file_path = tmp_path / "fork-error.log"
+        result = worker_with_job._run_job_via_supervisor(edge_job.command, 
error_file_path)
+
+        assert result == 1
+        assert error_file_path.exists()
+        assert "Supervise failed" in error_file_path.read_text()
+
+    @patch("airflow.executors.base_executor.BaseExecutor.run_workload")
+    @pytest.mark.skipif(
+        not AIRFLOW_V_3_3_PLUS, reason="Test is for Airflow >= 3.3.0 where 
BaseExecutor.run_workload is used"
+    )
+    @pytest.mark.asyncio
+    async def test_supervise_launch_fail(
+        self,
+        mock_run_workload,
+        worker_with_job: EdgeWorker,
+        tmp_path: Path,
+    ):
+        mock_run_workload.side_effect = Exception("Supervise failed")
+        worker_with_job.__dict__["_execution_api_server_url"] = 
"https://mock-server/execution";
         edge_job = worker_with_job.jobs.pop().edge_job
-        q = mock.MagicMock()
-        result = worker_with_job._run_job_via_supervisor(edge_job.command, q)
+        error_file_path = tmp_path / "fork-error.log"
+        result = worker_with_job._run_job_via_supervisor(edge_job.command, 
error_file_path)
 
         assert result == 1
-        q.put.assert_called_once()
+        assert error_file_path.exists()
+        assert "Supervise failed" in error_file_path.read_text()
 
     @patch("airflow.providers.edge3.cli.worker.jobs_fetch")
-    @patch("airflow.providers.edge3.cli.worker.EdgeWorker._launch_job", 
return_value=(Process(), Queue()))
+    @patch("airflow.providers.edge3.cli.worker.EdgeWorker._launch_job")
     @pytest.mark.asyncio
     async def test_fetch_and_run_job_no_job(
         self,
@@ -301,7 +423,7 @@ class TestEdgeWorker:
         mock_launch_job.assert_not_called()
 
     @patch("airflow.providers.edge3.cli.worker.jobs_fetch")
-    @patch("airflow.providers.edge3.cli.worker.EdgeWorker._launch_job", 
return_value=(Process(), Queue()))
+    @patch("airflow.providers.edge3.cli.worker.EdgeWorker._launch_job")
     @patch("airflow.providers.edge3.cli.worker.jobs_set_state")
     
@patch("airflow.providers.edge3.cli.worker.EdgeWorker._push_logs_in_chunks")
     @patch("airflow.providers.edge3.cli.worker.logs_push")
@@ -315,20 +437,20 @@ class TestEdgeWorker:
         mock_jobs_set_state,
         mock_launch_job,
         mock_jobs_fetch,
+        tmp_path: Path,
         worker_with_job: EdgeWorker,
     ):
-        mock_jobs_fetch.side_effect = [
-            EdgeJobFetched(
-                dag_id="test",
-                task_id="test",
-                run_id="test",
-                map_index=-1,
-                try_number=1,
-                concurrency_slots=1,
-                command=MOCK_COMMAND,  # type: ignore[arg-type]
-            ),
-            None,
-        ]
+        edge_job = EdgeJobFetched(
+            dag_id="test",
+            task_id="test",
+            run_id="test",
+            map_index=-1,
+            try_number=1,
+            concurrency_slots=1,
+            command=MOCK_COMMAND,  # type: ignore[arg-type]
+        )
+        mock_jobs_fetch.side_effect = [edge_job, None]
+        mock_launch_job.return_value = Job(edge_job, _MockProcess(), tmp_path 
/ "mock.log")
         worker_with_job.concurrency = 1  # only one job at a time
         assert worker_with_job.free_concurrency == 0
 
@@ -337,7 +459,9 @@ class TestEdgeWorker:
         mock_jobs_fetch.assert_called_once()
         fetch_args = mock_jobs_fetch.call_args
         assert fetch_args.args[3] is None  # team_name should be None
-        mock_launch_job.assert_called_once()
+        mock_launch_job.assert_called_once_with(
+            edge_job, edge_job.command, Path(worker_with_job.base_log_folder, 
"mock.log")
+        )
         assert mock_jobs_set_state.call_count == 2
         mock_push_log_chunks.assert_called_once()
         assert len(worker_with_job.jobs) == 1  # no new job added (was removed 
at the end...)
@@ -348,110 +472,128 @@ class TestEdgeWorker:
     
@patch("airflow.providers.edge3.cli.worker.EdgeWorker._push_logs_in_chunks")
     @patch("airflow.providers.edge3.cli.worker.logs_push")
     @pytest.mark.asyncio
-    async def test_fetch_and_run_job_possible_deadlock(
+    async def test_fetch_and_run_job_fork_failure_pushes_error_to_logs(
         self,
         mock_logs_push,
         mock_push_log_chunks,
         mock_jobs_set_state,
         mock_jobs_fetch,
+        tmp_path: Path,
         worker_with_job: EdgeWorker,
     ):
-        """Verify that a large exception from the subprocess does not deadlock 
fetch_and_run_job.
-
-        Uses an explicit ``fork`` context to spawn the simulated worker 
subprocess. Python 3.14
-        flipped the POSIX default ``multiprocessing`` start method to 
``forkserver``, which
-        spawns a fresh interpreter for the child — patches applied in the test 
process do not
-        propagate, so older variants of this test that mocked ``supervise`` no 
longer triggered
-        the deadlock condition. Forking a small top-level target sidesteps 
that and reproduces
-        the actual queue-feeder/pipe-buffer deadlock the fix targets.
-        """
-        mock_jobs_fetch.side_effect = [
-            EdgeJobFetched(
-                dag_id="test",
-                task_id="test",
-                run_id="test",
-                map_index=-1,
-                try_number=1,
-                concurrency_slots=1,
-                command=MOCK_COMMAND,  # type: ignore[arg-type]
-            ),
-            None,
-        ]
-        worker_with_job.concurrency = 1  # only one job at a time
-        assert worker_with_job.free_concurrency == 0
+        edge_job = EdgeJobFetched(
+            dag_id="test",
+            task_id="test",
+            run_id="test",
+            map_index=-1,
+            try_number=1,
+            concurrency_slots=1,
+            command=MOCK_COMMAND,  # type: ignore[arg-type]
+        )
+        mock_jobs_fetch.return_value = edge_job
+        worker_with_job.concurrency = 1
+        process = _MockProcess(returncode=1)
+        error_file_path = tmp_path / "fork-error.log"
+        error_file_path.write_text(
+            "Traceback (most recent call last):\n  ...\nRuntimeError: 
supervisor crashed\n"
+        )
+        launched_job = Job(edge_job, process, tmp_path / "mock.log", 
stderr_file_path=error_file_path)
+
+        with patch.object(worker_with_job, "_launch_job", 
return_value=launched_job):
+            await worker_with_job.fetch_and_run_job()
 
-        ctx = multiprocessing.get_context("fork")
-        results_queue = ctx.Queue()
-        process = ctx.Process(target=_emit_large_exception_target, 
args=(results_queue,))
-
-        with patch.object(EdgeWorker, "_launch_job", return_value=(process, 
results_queue)):
-            process.start()
-            try:
-                await asyncio.wait_for(worker_with_job.fetch_and_run_job(), 
timeout=10.0)
-            except asyncio.TimeoutError:
-                # Clean up any hanging subprocess to prevent blocking pytest
-                if process.is_alive():
-                    process.terminate()
-                    process.join(timeout=1.0)
-                    if process.is_alive():
-                        process.kill()
-                        process.join()
-                pytest.fail("fetch_and_run_job timed out after 10s - DEADLOCK 
DETECTED. ")
-            finally:
-                if process.is_alive():
-                    process.join(timeout=1.0)
-                    if process.is_alive():
-                        process.terminate()
-                        process.join(timeout=1.0)
-
-        # If we reach here without timeout, the deadlock was not triggered
-        assert mock_jobs_set_state.call_count >= 1
-        mock_push_log_chunks.assert_called()
-        assert len(worker_with_job.jobs) <= 1  # new job removed, original 
fixture job still there
+        mock_jobs_fetch.assert_called_once()
+        mock_push_log_chunks.assert_called_once()
+        assert mock_jobs_set_state.call_args_list[-1].args[1] == 
TaskInstanceState.FAILED
+        log_chunk_data = mock_logs_push.call_args.kwargs["log_chunk_data"]
+        assert "Task fork exited with code 1" in log_chunk_data
+        assert "RuntimeError: supervisor crashed" in log_chunk_data
+        assert not error_file_path.exists()
 
     @patch("airflow.providers.edge3.cli.worker.jobs_fetch")
-    @patch("airflow.providers.edge3.cli.worker.EdgeWorker._launch_job", 
return_value=(Process(), Queue()))
+    @patch("airflow.providers.edge3.cli.worker.EdgeWorker._launch_job")
     @patch("airflow.providers.edge3.cli.worker.jobs_set_state")
     
@patch("airflow.providers.edge3.cli.worker.EdgeWorker._push_logs_in_chunks")
     @patch("airflow.providers.edge3.cli.worker.logs_push")
     @patch.object(Job, "is_running", property(lambda _: False))
     @patch.object(Job, "is_success", property(lambda _: False))
-    @patch("traceback.format_exception", return_value=[])
     @pytest.mark.asyncio
     async def test_fetch_and_run_job_one_job_fail(
         self,
-        mock_traceback,
         mock_logs_push,
         mock_push_log_chunks,
         mock_jobs_set_state,
         mock_launch_job,
         mock_jobs_fetch,
+        tmp_path: Path,
         worker_with_job: EdgeWorker,
     ):
-        mock_jobs_fetch.side_effect = [
-            EdgeJobFetched(
-                dag_id="test",
-                task_id="test",
-                run_id="test",
-                map_index=-1,
-                try_number=1,
-                concurrency_slots=1,
-                command=MOCK_COMMAND,  # type: ignore[arg-type]
-            ),
-            None,
-        ]
+        edge_job = EdgeJobFetched(
+            dag_id="test",
+            task_id="test",
+            run_id="test",
+            map_index=-1,
+            try_number=1,
+            concurrency_slots=1,
+            command=MOCK_COMMAND,  # type: ignore[arg-type]
+        )
+        mock_jobs_fetch.side_effect = [edge_job, None]
+        mock_launch_job.return_value = Job(edge_job, _MockProcess(), tmp_path 
/ "mock.log")
         worker_with_job.concurrency = 1  # only one job at a time
         assert worker_with_job.free_concurrency == 0
 
         await worker_with_job.fetch_and_run_job()
 
         mock_jobs_fetch.assert_called_once()
-        mock_launch_job.assert_called_once()
+        mock_launch_job.assert_called_once_with(
+            edge_job, edge_job.command, Path(worker_with_job.base_log_folder, 
"mock.log")
+        )
         assert mock_jobs_set_state.call_count == 2
         mock_push_log_chunks.assert_called_once()
         assert len(worker_with_job.jobs) == 1  # no new job added (was removed 
at the end...)
         mock_logs_push.assert_called_once()
 
+    @patch("airflow.providers.edge3.cli.worker.jobs_fetch")
+    @patch("airflow.providers.edge3.cli.worker.jobs_set_state")
+    
@patch("airflow.providers.edge3.cli.worker.EdgeWorker._push_logs_in_chunks")
+    @patch("airflow.providers.edge3.cli.worker.logs_push")
+    @pytest.mark.asyncio
+    async def test_fetch_and_run_job_subprocess_failure_pushes_stderr_to_logs(
+        self,
+        mock_logs_push,
+        mock_push_log_chunks,
+        mock_jobs_set_state,
+        mock_jobs_fetch,
+        tmp_path: Path,
+        worker_with_job: EdgeWorker,
+    ):
+        edge_job = EdgeJobFetched(
+            dag_id="test",
+            task_id="test",
+            run_id="test",
+            map_index=-1,
+            try_number=1,
+            concurrency_slots=1,
+            command=MOCK_COMMAND,  # type: ignore[arg-type]
+        )
+        mock_jobs_fetch.return_value = edge_job
+        worker_with_job.concurrency = 1
+        process = _MockPopen(returncode=1, pid=5678)
+        stderr_file_path = tmp_path / "subprocess-stderr.log"
+        stderr_file_path.write_text("ModuleNotFoundError: No module named 
'common'\n")
+        launched_job = Job(edge_job, process, tmp_path / "mock.log", 
stderr_file_path=stderr_file_path)
+
+        with patch.object(worker_with_job, "_launch_job", 
return_value=launched_job):
+            await worker_with_job.fetch_and_run_job()
+
+        mock_jobs_fetch.assert_called_once()
+        mock_push_log_chunks.assert_called_once()
+        assert mock_jobs_set_state.call_args_list[-1].args[1] == 
TaskInstanceState.FAILED
+        log_chunk_data = mock_logs_push.call_args.kwargs["log_chunk_data"]
+        assert "Task subprocess exited with code 1" in log_chunk_data
+        assert "ModuleNotFoundError: No module named 'common'" in 
log_chunk_data
+        assert not stderr_file_path.exists()
+
     @time_machine.travel(datetime.now(), tick=False)
     @patch("airflow.providers.edge3.cli.worker.logs_push")
     @pytest.mark.asyncio
@@ -929,7 +1071,7 @@ class TestSignalHandling:
         ):
             rc = worker._run_job_via_supervisor(
                 workload=self._make_workload(),
-                results_queue=mock.MagicMock(),
+                error_file_path=tmp_path / "fork-error.log",
             )
         assert rc == 0
         assert [c.args[0] for c in order.call_args_list] == ["reset", 
"setpgrp", "supervise"]


Reply via email to