This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 45db47f0e25 Bugfix/make edge resilient against log errors (#66995)
45db47f0e25 is described below
commit 45db47f0e250e54edd28db47683b0c0d3e2d146e
Author: Jens Scheffler <[email protected]>
AuthorDate: Sat May 16 01:12:12 2026 +0200
Bugfix/make edge resilient against log errors (#66995)
* Make Edge Worker more resilient for log read errors
* Make Edge Worker more resilient for log read errors, fix ruff
* Add pytests
---
.../src/airflow/providers/edge3/cli/worker.py | 109 ++++++++++++---------
.../edge3/tests/unit/edge3/cli/test_worker.py | 57 +++++++++++
2 files changed, 117 insertions(+), 49 deletions(-)
diff --git a/providers/edge3/src/airflow/providers/edge3/cli/worker.py
b/providers/edge3/src/airflow/providers/edge3/cli/worker.py
index fec957f06eb..1054e688e76 100644
--- a/providers/edge3/src/airflow/providers/edge3/cli/worker.py
+++ b/providers/edge3/src/airflow/providers/edge3/cli/worker.py
@@ -544,25 +544,34 @@ class EdgeWorker:
async def _push_logs_in_chunks(self, job: Job):
aio_logfile = anyio.Path(job.logfile)
- if self.push_logs and await aio_logfile.exists() and (await
aio_logfile.stat()).st_size > job.logsize:
- async with aio_open(job.logfile, mode="rb") as logf:
- await logf.seek(job.logsize, os.SEEK_SET)
- read_data = await logf.read()
- job.logsize += len(read_data)
- # backslashreplace to keep not decoded characters and not
raising exception
- # replace null with question mark to fix issue during DB push
- log_data =
read_data.decode(errors="backslashreplace").replace("\x00", "\ufffd")
- while True:
- chunk_data = log_data[: self.push_log_chunk_size]
- log_data = log_data[self.push_log_chunk_size :]
- if not chunk_data:
- break
-
- await logs_push(
- task=job.edge_job.key,
- log_chunk_time=timezone.utcnow(),
- log_chunk_data=chunk_data,
- )
+ try:
+ if (
+ self.push_logs
+ and await aio_logfile.exists()
+ and (await aio_logfile.stat()).st_size > job.logsize
+ ):
+ async with aio_open(job.logfile, mode="rb") as logf:
+ await logf.seek(job.logsize, os.SEEK_SET)
+ read_data = await logf.read()
+ job.logsize += len(read_data)
+ # backslashreplace to keep not decoded characters and not
raising exception
+ # replace null with question mark to fix issue during DB
push
+ log_data =
read_data.decode(errors="backslashreplace").replace("\x00", "\ufffd")
+ while True:
+ chunk_data = log_data[: self.push_log_chunk_size]
+ log_data = log_data[self.push_log_chunk_size :]
+ if not chunk_data:
+ break
+
+ await logs_push(
+ task=job.edge_job.key,
+ log_chunk_time=timezone.utcnow(),
+ log_chunk_data=chunk_data,
+ )
+ except (FileNotFoundError, OSError):
+ logger.exception("Log file %s vanished while reading, ignoring.",
job.logfile)
+ # Swallow the exception; the file may have been removed by log
rotation or cleanup while we were reading it.
+ # We'll catch up on the next heartbeat/log push or file was
uploaded by log integration in parallel.
async def start(self):
"""Start the execution in a loop until terminated."""
@@ -670,39 +679,41 @@ class EdgeWorker:
logfile = Path(self.base_log_folder, workload.log_path)
job = self._launch_job(edge_job, workload, logfile)
self.jobs.append(job)
- await jobs_set_state(edge_job.key, TaskInstanceState.RUNNING)
+ try:
+ await jobs_set_state(edge_job.key, TaskInstanceState.RUNNING)
- # As we got one job, directly fetch another one if possible
- if self.free_concurrency > 0:
- task = create_task(self.fetch_and_run_job())
- self.background_tasks.add(task)
- task.add_done_callback(self.background_tasks.discard)
+ # As we got one job, directly fetch another one if possible
+ if self.free_concurrency > 0:
+ task = create_task(self.fetch_and_run_job())
+ self.background_tasks.add(task)
+ task.add_done_callback(self.background_tasks.discard)
- while job.is_running:
+ while job.is_running:
+ await self._push_logs_in_chunks(job)
+ for _ in range(0, self.job_poll_interval * 10):
+ await sleep(0.1)
+ if not job.is_running:
+ break
await self._push_logs_in_chunks(job)
- for _ in range(0, self.job_poll_interval * 10):
- await sleep(0.1)
- if not job.is_running:
- break
- await self._push_logs_in_chunks(job)
-
- self.jobs.remove(job)
- if job.is_success:
- logger.info("Job completed: %s", job.edge_job.identifier)
- await jobs_set_state(job.edge_job.key, TaskInstanceState.SUCCESS)
- else:
- ex_txt = job.failure_details()
- logger.error("Job failed: %s with:\n%s", job.edge_job.identifier,
ex_txt)
-
- # Push it upwards to logs for better diagnostic as well
- await logs_push(
- task=job.edge_job.key,
- log_chunk_time=timezone.utcnow(),
- log_chunk_data=f"Error executing job:\n{ex_txt}",
- )
- await jobs_set_state(job.edge_job.key, TaskInstanceState.FAILED)
- # Cleanup temp files used for the job
- job.cleanup()
+
+ if job.is_success:
+ logger.info("Job completed: %s", job.edge_job.identifier)
+ await jobs_set_state(job.edge_job.key,
TaskInstanceState.SUCCESS)
+ else:
+ ex_txt = job.failure_details()
+ logger.error("Job failed: %s with:\n%s",
job.edge_job.identifier, ex_txt)
+
+ # Push it upwards to logs for better diagnostic as well
+ await logs_push(
+ task=job.edge_job.key,
+ log_chunk_time=timezone.utcnow(),
+ log_chunk_data=f"Error executing job:\n{ex_txt}",
+ )
+ await jobs_set_state(job.edge_job.key,
TaskInstanceState.FAILED)
+ finally:
+ self.jobs.remove(job)
+ # Cleanup temp files used for the job
+ job.cleanup()
async def heartbeat(self, new_maintenance_comments: str | None = None) ->
bool:
"""Report liveness state of worker to central site with stats."""
diff --git a/providers/edge3/tests/unit/edge3/cli/test_worker.py
b/providers/edge3/tests/unit/edge3/cli/test_worker.py
index b50af2f7197..245c7431cd4 100644
--- a/providers/edge3/tests/unit/edge3/cli/test_worker.py
+++ b/providers/edge3/tests/unit/edge3/cli/test_worker.py
@@ -594,6 +594,44 @@ class TestEdgeWorker:
assert "ModuleNotFoundError: No module named 'common'" in
log_chunk_data
assert not stderr_file_path.exists()
+ @patch("airflow.providers.edge3.cli.worker.jobs_set_state",
side_effect=RuntimeError("set state failed"))
+ @patch("airflow.providers.edge3.cli.worker.jobs_fetch")
+ @pytest.mark.asyncio
+ async def test_fetch_and_run_job_cleans_up_when_mark_running_fails(
+ self,
+ mock_jobs_fetch,
+ _mock_jobs_set_state,
+ tmp_path: Path,
+ worker_with_job: EdgeWorker,
+ ):
+ edge_job = EdgeJobFetched(
+ dag_id="test",
+ task_id="test",
+ run_id="test",
+ map_index=-1,
+ try_number=1,
+ concurrency_slots=1,
+ command=MOCK_COMMAND, # type: ignore[arg-type]
+ )
+ mock_jobs_fetch.return_value = edge_job
+ stderr_file_path = tmp_path / "cleanup-marker.log"
+ stderr_file_path.write_text("cleanup me")
+ launched_job = Job(
+ edge_job,
+ _MockProcess(returncode=None),
+ tmp_path / "mock.log",
+ stderr_file_path=stderr_file_path,
+ )
+
+ with (
+ patch.object(worker_with_job, "_launch_job",
return_value=launched_job),
+ pytest.raises(RuntimeError, match="set state failed"),
+ ):
+ await worker_with_job.fetch_and_run_job()
+
+ assert launched_job not in worker_with_job.jobs
+ assert not stderr_file_path.exists()
+
@time_machine.travel(datetime.now(), tick=False)
@patch("airflow.providers.edge3.cli.worker.logs_push")
@pytest.mark.asyncio
@@ -651,6 +689,25 @@ class TestEdgeWorker:
task=job.edge_job.key, log_chunk_time=timezone.utcnow(),
log_chunk_data="log3"
)
+ @patch("airflow.providers.edge3.cli.worker.logger")
+ @patch("airflow.providers.edge3.cli.worker.logs_push")
+ @patch("airflow.providers.edge3.cli.worker.aio_open",
side_effect=FileNotFoundError)
+ @pytest.mark.asyncio
+ async def test_push_logs_in_chunks_swallow_file_not_found_during_read(
+ self,
+ _mock_aio_open,
+ mock_logs_push,
+ mock_logger,
+ worker_with_job: EdgeWorker,
+ ):
+ job = EdgeWorker.jobs[0]
+ await anyio.Path(job.logfile).write_text("some log content")
+ with conf_vars({("edge", "api_url"):
"https://invalid-api-test-endpoint"}):
+ await worker_with_job._push_logs_in_chunks(job)
+
+ mock_logs_push.assert_not_called()
+ mock_logger.exception.assert_called_once()
+
@pytest.mark.parametrize(
("drain", "maintenance_mode", "jobs", "expected_state"),
[