This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 45db47f0e25 Bugfix/make edge resilient against log errors (#66995)
45db47f0e25 is described below

commit 45db47f0e250e54edd28db47683b0c0d3e2d146e
Author: Jens Scheffler <[email protected]>
AuthorDate: Sat May 16 01:12:12 2026 +0200

    Bugfix/make edge resilient against log errors (#66995)
    
    * Make Edge Worker more resilient for log read errors
    
    * Make Edge Worker more resilient for log read errors, fix ruff
    
    * Add pytests
---
 .../src/airflow/providers/edge3/cli/worker.py      | 109 ++++++++++++---------
 .../edge3/tests/unit/edge3/cli/test_worker.py      |  57 +++++++++++
 2 files changed, 117 insertions(+), 49 deletions(-)

diff --git a/providers/edge3/src/airflow/providers/edge3/cli/worker.py 
b/providers/edge3/src/airflow/providers/edge3/cli/worker.py
index fec957f06eb..1054e688e76 100644
--- a/providers/edge3/src/airflow/providers/edge3/cli/worker.py
+++ b/providers/edge3/src/airflow/providers/edge3/cli/worker.py
@@ -544,25 +544,34 @@ class EdgeWorker:
 
     async def _push_logs_in_chunks(self, job: Job):
         aio_logfile = anyio.Path(job.logfile)
-        if self.push_logs and await aio_logfile.exists() and (await 
aio_logfile.stat()).st_size > job.logsize:
-            async with aio_open(job.logfile, mode="rb") as logf:
-                await logf.seek(job.logsize, os.SEEK_SET)
-                read_data = await logf.read()
-                job.logsize += len(read_data)
-                # backslashreplace to keep not decoded characters and not 
raising exception
-                # replace null with question mark to fix issue during DB push
-                log_data = 
read_data.decode(errors="backslashreplace").replace("\x00", "\ufffd")
-                while True:
-                    chunk_data = log_data[: self.push_log_chunk_size]
-                    log_data = log_data[self.push_log_chunk_size :]
-                    if not chunk_data:
-                        break
-
-                    await logs_push(
-                        task=job.edge_job.key,
-                        log_chunk_time=timezone.utcnow(),
-                        log_chunk_data=chunk_data,
-                    )
+        try:
+            if (
+                self.push_logs
+                and await aio_logfile.exists()
+                and (await aio_logfile.stat()).st_size > job.logsize
+            ):
+                async with aio_open(job.logfile, mode="rb") as logf:
+                    await logf.seek(job.logsize, os.SEEK_SET)
+                    read_data = await logf.read()
+                    job.logsize += len(read_data)
+                    # backslashreplace to keep not decoded characters and not 
raising exception
+                    # replace null with question mark to fix issue during DB 
push
+                    log_data = 
read_data.decode(errors="backslashreplace").replace("\x00", "\ufffd")
+                    while True:
+                        chunk_data = log_data[: self.push_log_chunk_size]
+                        log_data = log_data[self.push_log_chunk_size :]
+                        if not chunk_data:
+                            break
+
+                        await logs_push(
+                            task=job.edge_job.key,
+                            log_chunk_time=timezone.utcnow(),
+                            log_chunk_data=chunk_data,
+                        )
+        except (FileNotFoundError, OSError):
+            logger.exception("Log file %s vanished while reading, ignoring.", 
job.logfile)
+            # Swallow the exception; the file may have been removed by log 
rotation or cleanup while we were reading it.
+            # We'll catch up on the next heartbeat/log push or file was 
uploaded by log integration in parallel.
 
     async def start(self):
         """Start the execution in a loop until terminated."""
@@ -670,39 +679,41 @@ class EdgeWorker:
         logfile = Path(self.base_log_folder, workload.log_path)
         job = self._launch_job(edge_job, workload, logfile)
         self.jobs.append(job)
-        await jobs_set_state(edge_job.key, TaskInstanceState.RUNNING)
+        try:
+            await jobs_set_state(edge_job.key, TaskInstanceState.RUNNING)
 
-        # As we got one job, directly fetch another one if possible
-        if self.free_concurrency > 0:
-            task = create_task(self.fetch_and_run_job())
-            self.background_tasks.add(task)
-            task.add_done_callback(self.background_tasks.discard)
+            # As we got one job, directly fetch another one if possible
+            if self.free_concurrency > 0:
+                task = create_task(self.fetch_and_run_job())
+                self.background_tasks.add(task)
+                task.add_done_callback(self.background_tasks.discard)
 
-        while job.is_running:
+            while job.is_running:
+                await self._push_logs_in_chunks(job)
+                for _ in range(0, self.job_poll_interval * 10):
+                    await sleep(0.1)
+                    if not job.is_running:
+                        break
             await self._push_logs_in_chunks(job)
-            for _ in range(0, self.job_poll_interval * 10):
-                await sleep(0.1)
-                if not job.is_running:
-                    break
-        await self._push_logs_in_chunks(job)
-
-        self.jobs.remove(job)
-        if job.is_success:
-            logger.info("Job completed: %s", job.edge_job.identifier)
-            await jobs_set_state(job.edge_job.key, TaskInstanceState.SUCCESS)
-        else:
-            ex_txt = job.failure_details()
-            logger.error("Job failed: %s with:\n%s", job.edge_job.identifier, 
ex_txt)
-
-            # Push it upwards to logs for better diagnostic as well
-            await logs_push(
-                task=job.edge_job.key,
-                log_chunk_time=timezone.utcnow(),
-                log_chunk_data=f"Error executing job:\n{ex_txt}",
-            )
-            await jobs_set_state(job.edge_job.key, TaskInstanceState.FAILED)
-        # Cleanup temp files used for the job
-        job.cleanup()
+
+            if job.is_success:
+                logger.info("Job completed: %s", job.edge_job.identifier)
+                await jobs_set_state(job.edge_job.key, 
TaskInstanceState.SUCCESS)
+            else:
+                ex_txt = job.failure_details()
+                logger.error("Job failed: %s with:\n%s", 
job.edge_job.identifier, ex_txt)
+
+                # Push it upwards to logs for better diagnostic as well
+                await logs_push(
+                    task=job.edge_job.key,
+                    log_chunk_time=timezone.utcnow(),
+                    log_chunk_data=f"Error executing job:\n{ex_txt}",
+                )
+                await jobs_set_state(job.edge_job.key, 
TaskInstanceState.FAILED)
+        finally:
+            self.jobs.remove(job)
+            # Cleanup temp files used for the job
+            job.cleanup()
 
     async def heartbeat(self, new_maintenance_comments: str | None = None) -> 
bool:
         """Report liveness state of worker to central site with stats."""
diff --git a/providers/edge3/tests/unit/edge3/cli/test_worker.py 
b/providers/edge3/tests/unit/edge3/cli/test_worker.py
index b50af2f7197..245c7431cd4 100644
--- a/providers/edge3/tests/unit/edge3/cli/test_worker.py
+++ b/providers/edge3/tests/unit/edge3/cli/test_worker.py
@@ -594,6 +594,44 @@ class TestEdgeWorker:
         assert "ModuleNotFoundError: No module named 'common'" in 
log_chunk_data
         assert not stderr_file_path.exists()
 
+    @patch("airflow.providers.edge3.cli.worker.jobs_set_state", 
side_effect=RuntimeError("set state failed"))
+    @patch("airflow.providers.edge3.cli.worker.jobs_fetch")
+    @pytest.mark.asyncio
+    async def test_fetch_and_run_job_cleans_up_when_mark_running_fails(
+        self,
+        mock_jobs_fetch,
+        _mock_jobs_set_state,
+        tmp_path: Path,
+        worker_with_job: EdgeWorker,
+    ):
+        edge_job = EdgeJobFetched(
+            dag_id="test",
+            task_id="test",
+            run_id="test",
+            map_index=-1,
+            try_number=1,
+            concurrency_slots=1,
+            command=MOCK_COMMAND,  # type: ignore[arg-type]
+        )
+        mock_jobs_fetch.return_value = edge_job
+        stderr_file_path = tmp_path / "cleanup-marker.log"
+        stderr_file_path.write_text("cleanup me")
+        launched_job = Job(
+            edge_job,
+            _MockProcess(returncode=None),
+            tmp_path / "mock.log",
+            stderr_file_path=stderr_file_path,
+        )
+
+        with (
+            patch.object(worker_with_job, "_launch_job", 
return_value=launched_job),
+            pytest.raises(RuntimeError, match="set state failed"),
+        ):
+            await worker_with_job.fetch_and_run_job()
+
+        assert launched_job not in worker_with_job.jobs
+        assert not stderr_file_path.exists()
+
     @time_machine.travel(datetime.now(), tick=False)
     @patch("airflow.providers.edge3.cli.worker.logs_push")
     @pytest.mark.asyncio
@@ -651,6 +689,25 @@ class TestEdgeWorker:
             task=job.edge_job.key, log_chunk_time=timezone.utcnow(), 
log_chunk_data="log3"
         )
 
+    @patch("airflow.providers.edge3.cli.worker.logger")
+    @patch("airflow.providers.edge3.cli.worker.logs_push")
+    @patch("airflow.providers.edge3.cli.worker.aio_open", 
side_effect=FileNotFoundError)
+    @pytest.mark.asyncio
+    async def test_push_logs_in_chunks_swallow_file_not_found_during_read(
+        self,
+        _mock_aio_open,
+        mock_logs_push,
+        mock_logger,
+        worker_with_job: EdgeWorker,
+    ):
+        job = EdgeWorker.jobs[0]
+        await anyio.Path(job.logfile).write_text("some log content")
+        with conf_vars({("edge", "api_url"): 
"https://invalid-api-test-endpoint"}):
+            await worker_with_job._push_logs_in_chunks(job)
+
+        mock_logs_push.assert_not_called()
+        mock_logger.exception.assert_called_once()
+
     @pytest.mark.parametrize(
         ("drain", "maintenance_mode", "jobs", "expected_state"),
         [

Reply via email to