This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new bf3afcebee7 Fix Edge worker fork mode reporting supervisor failures as
success (#67887)
bf3afcebee7 is described below
commit bf3afcebee7942e9c62053aea96375f71a583696
Author: Jeongwoo Do <[email protected]>
AuthorDate: Wed Jun 3 03:54:16 2026 +0900
Fix Edge worker fork mode reporting supervisor failures as success (#67887)
* Fix Edge worker fork mode reporting supervisor failures as success
* fix logic
---
.../src/airflow/providers/edge3/cli/worker.py | 18 +++--
.../edge3/tests/unit/edge3/cli/test_worker.py | 76 ++++++++++++++++++----
2 files changed, 74 insertions(+), 20 deletions(-)
diff --git a/providers/edge3/src/airflow/providers/edge3/cli/worker.py
b/providers/edge3/src/airflow/providers/edge3/cli/worker.py
index 1a044fdde2e..3ecb6396f2c 100644
--- a/providers/edge3/src/airflow/providers/edge3/cli/worker.py
+++ b/providers/edge3/src/airflow/providers/edge3/cli/worker.py
@@ -32,7 +32,7 @@ from functools import cached_property
from http import HTTPStatus
from multiprocessing import Process
from pathlib import Path
-from typing import IO, TYPE_CHECKING
+from typing import IO, TYPE_CHECKING, NoReturn
import anyio
from aiofiles import open as aio_open
@@ -422,7 +422,7 @@ class EdgeWorker:
return EdgeWorkerState.MAINTENANCE_MODE
return EdgeWorkerState.IDLE
- def _run_job_via_supervisor(self, workload: ExecuteTypeBody,
error_file_path: Path) -> int:
+ def _run_job_via_supervisor(self, workload: ExecuteTypeBody,
error_file_path: Path) -> NoReturn:
"""Run a task by calling the supervisor directly (executes inside a
forked child process)."""
_reset_parent_signal_state()
@@ -435,7 +435,7 @@ class EdgeWorker:
if AIRFLOW_V_3_3_PLUS:
from airflow.executors.base_executor import BaseExecutor
- BaseExecutor.run_workload(
+ exit_code = BaseExecutor.run_workload(
workload=workload,
server=self._execution_api_server_url,
)
@@ -448,7 +448,7 @@ class EdgeWorker:
f"dag_id={ti.dag_id} task_id={ti.task_id}
run_id={ti.run_id} map_index={ti.map_index} "
f"try_number={ti.try_number}"
)
- supervise(
+ exit_code = supervise(
# This is the "wrong" ti type, but it duck types the same.
TODO: Create a protocol for this.
# Same like in
airflow/executors/local_executor.py:_execute_workload()
ti=ti, # type: ignore[arg-type]
@@ -458,12 +458,17 @@ class EdgeWorker:
server=self._execution_api_server_url,
log_path=workload.log_path,
)
- return 0
except Exception:
logger.exception("Task execution failed")
with suppress(Exception):
error_file_path.write_text(traceback.format_exc())
- return 1
+ exit_code = 1
+
+ # Exit explicitly so the real exit code propagates to the parent
process.
+ # the child would always exit 0 without this, so a failed supervisor
+ # (non-zero ``exit_code``, e.g. when ``run_workload`` reports a task
failure without raising)
+ # would be misreported as success by the parent's ``Job.is_success``
check.
+ sys.exit(exit_code)
def _launch_job_subprocess(self, workload: ExecuteTypeBody) ->
tuple[subprocess.Popen, Path]:
"""Launch workload via a fresh Python interpreter
(subprocess.Popen)."""
@@ -700,6 +705,7 @@ class EdgeWorker:
break
await self._push_logs_in_chunks(job)
+ logger.info("The code is changed: %s", job.edge_job.identifier)
if job.is_success:
logger.info("Job completed: %s", job.edge_job.identifier)
await jobs_set_state(job.edge_job.key,
TaskInstanceState.SUCCESS)
diff --git a/providers/edge3/tests/unit/edge3/cli/test_worker.py
b/providers/edge3/tests/unit/edge3/cli/test_worker.py
index 23a247d8a08..bbbf2690faf 100644
--- a/providers/edge3/tests/unit/edge3/cli/test_worker.py
+++ b/providers/edge3/tests/unit/edge3/cli/test_worker.py
@@ -363,12 +363,15 @@ class TestEdgeWorker:
worker_with_job: EdgeWorker,
tmp_path: Path,
):
+ mock_supervise.return_value = 0
worker_with_job.__dict__["_execution_api_server_url"] =
"https://mock-server/execution"
edge_job = worker_with_job.jobs.pop().edge_job
error_file_path = tmp_path / "fork-error.log"
- result = worker_with_job._run_job_via_supervisor(edge_job.command,
error_file_path)
+ with mock.patch("os.setpgrp"), pytest.raises(SystemExit) as exc_info:
+ worker_with_job._run_job_via_supervisor(edge_job.command,
error_file_path)
- assert result == 0
+ # The child process must exit 0 on success so the parent's
Job.is_success check passes.
+ assert exc_info.value.code == 0
assert not error_file_path.exists() # no error written on success
@patch("airflow.executors.base_executor.BaseExecutor.run_workload")
@@ -382,12 +385,15 @@ class TestEdgeWorker:
worker_with_job: EdgeWorker,
tmp_path: Path,
):
+ mock_run_workload.return_value = 0
worker_with_job.__dict__["_execution_api_server_url"] =
"https://mock-server/execution"
edge_job = worker_with_job.jobs.pop().edge_job
error_file_path = tmp_path / "fork-error.log"
- result = worker_with_job._run_job_via_supervisor(edge_job.command,
error_file_path)
+ with mock.patch("os.setpgrp"), pytest.raises(SystemExit) as exc_info:
+ worker_with_job._run_job_via_supervisor(edge_job.command,
error_file_path)
- assert result == 0
+ # The child process must exit 0 on success so the parent's
Job.is_success check passes.
+ assert exc_info.value.code == 0
assert not error_file_path.exists() # no error written on success
@patch("airflow.sdk.execution_time.supervisor.supervise")
@@ -403,9 +409,10 @@ class TestEdgeWorker:
worker_with_job.__dict__["_execution_api_server_url"] =
"https://mock-server/execution"
edge_job = worker_with_job.jobs.pop().edge_job
error_file_path = tmp_path / "fork-error.log"
- result = worker_with_job._run_job_via_supervisor(edge_job.command,
error_file_path)
+ with mock.patch("os.setpgrp"), pytest.raises(SystemExit) as exc_info:
+ worker_with_job._run_job_via_supervisor(edge_job.command,
error_file_path)
- assert result == 1
+ assert exc_info.value.code == 1
assert error_file_path.exists()
assert "Supervise failed" in error_file_path.read_text()
@@ -424,12 +431,52 @@ class TestEdgeWorker:
worker_with_job.__dict__["_execution_api_server_url"] =
"https://mock-server/execution"
edge_job = worker_with_job.jobs.pop().edge_job
error_file_path = tmp_path / "fork-error.log"
- result = worker_with_job._run_job_via_supervisor(edge_job.command,
error_file_path)
+ with mock.patch("os.setpgrp"), pytest.raises(SystemExit) as exc_info:
+ worker_with_job._run_job_via_supervisor(edge_job.command,
error_file_path)
- assert result == 1
+ assert exc_info.value.code == 1
assert error_file_path.exists()
assert "Supervise failed" in error_file_path.read_text()
+ @patch("airflow.executors.base_executor.BaseExecutor.run_workload")
+ @pytest.mark.skipif(not hasattr(os, "fork"), reason="Requires the fork
start method")
+ @pytest.mark.skipif(
+ not AIRFLOW_V_3_3_PLUS, reason="Test is for Airflow >= 3.3.0 where
BaseExecutor.run_workload is used"
+ )
+ def test_fork_child_exits_nonzero_when_supervisor_raises(
+ self,
+ mock_run_workload,
+ worker_with_job: EdgeWorker,
+ tmp_path: Path,
+ ):
+ """
+ A supervisor exception must make the forked child terminate with a
non-zero exit code so the
+ parent's Job.is_success reports the failure.
+ """
+ import multiprocessing
+
+ mock_run_workload.side_effect = RuntimeError("supervisor crashed")
+ worker_with_job.__dict__["_execution_api_server_url"] =
"https://mock-server/execution"
+ edge_job = worker_with_job.jobs.pop().edge_job
+ error_file_path = tmp_path / "fork-error.log"
+
+ # Use the fork context explicitly so the child inherits the patched
run_workload in memory.
+ process = multiprocessing.get_context("fork").Process(
+ target=worker_with_job._run_job_via_supervisor,
+ kwargs={"workload": edge_job.command, "error_file_path":
error_file_path},
+ )
+ process.start()
+ process.join(timeout=30)
+
+ # With ``return 1`` this would have been 0; ``sys.exit(1)`` propagates
the non-zero code.
+ assert process.exitcode == 1
+ # And the parent's success check therefore reports failure instead of
a false success.
+ job = Job(edge_job=edge_job, process=process, logfile=tmp_path /
"file.log") # type: ignore[arg-type]
+ assert job.is_success is False
+ # Confirm the non-zero exit came from the supervisor failure path (not
an unrelated early error).
+ assert error_file_path.exists()
+ assert "supervisor crashed" in error_file_path.read_text()
+
@patch("airflow.providers.edge3.cli.worker.jobs_fetch")
@patch("airflow.providers.edge3.cli.worker.EdgeWorker._launch_job")
@pytest.mark.asyncio
@@ -1148,14 +1195,15 @@ class TestSignalHandling:
mock.patch("os.setpgrp", side_effect=lambda: order("setpgrp")),
mock.patch(
"airflow.executors.base_executor.BaseExecutor.run_workload",
- side_effect=lambda **_: order("supervise"),
+ side_effect=lambda **_: (order("supervise"), 0)[1],
),
):
- rc = worker._run_job_via_supervisor(
- workload=self._make_workload(),
- error_file_path=tmp_path / "fork-error.log",
- )
- assert rc == 0
+ with pytest.raises(SystemExit) as exc_info:
+ worker._run_job_via_supervisor(
+ workload=self._make_workload(),
+ error_file_path=tmp_path / "fork-error.log",
+ )
+ assert exc_info.value.code == 0
assert [c.args[0] for c in order.call_args_list] == ["reset",
"setpgrp", "supervise"]
def test_shutdown_handler_is_idempotent(self, worker_with_one_job):