This is an automated email from the ASF dual-hosted git repository.

amoghrajesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 12a9985bd5f Add better logging to ResumableJobMixin for crash recovery 
observability (#68206)
12a9985bd5f is described below

commit 12a9985bd5f13c412456e32c8688f042ff686e36
Author: Amogh Desai <[email protected]>
AuthorDate: Tue Jun 9 12:31:01 2026 +0530

    Add better logging to ResumableJobMixin for crash recovery observability 
(#68206)
---
 .../src/airflow/sdk/bases/resumablejobmixin.py     | 33 ++++++++++++++++------
 .../tests/task_sdk/bases/test_resumablemixin.py    | 30 ++++++++++++++++++++
 2 files changed, 55 insertions(+), 8 deletions(-)

diff --git a/task-sdk/src/airflow/sdk/bases/resumablejobmixin.py 
b/task-sdk/src/airflow/sdk/bases/resumablejobmixin.py
index 55a561c030d..5a6e9dad7a4 100644
--- a/task-sdk/src/airflow/sdk/bases/resumablejobmixin.py
+++ b/task-sdk/src/airflow/sdk/bases/resumablejobmixin.py
@@ -103,31 +103,48 @@ class ResumableJobMixin:
         """
         task_store = context.get("task_store")
 
-        if task_store is not None:
+        if task_store is None:
+            self.log.warning("task_store not available in context, crash 
recovery is disabled for this run")
+        else:
             external_id = task_store.get(self.external_id_key)
             if external_id:
                 status = self.get_job_status(external_id, context)
                 if self.is_job_active(status):
                     self.log.info(
-                        "Reconnecting to existing job identified by: %s 
(status: %s)", external_id, status
+                        "Reconnecting to existing job",
+                        external_id_key=self.external_id_key,
+                        external_id=external_id,
+                        status=status,
                     )
                     return self.poll_until_complete(external_id, context)
                 if self.is_job_succeeded(status):
                     self.log.info(
-                        "Job with identifier: %s already completed 
successfully, skipping resubmission",
-                        external_id,
+                        "Job already completed successfully, skipping 
resubmission",
+                        external_id_key=self.external_id_key,
+                        external_id=external_id,
                     )
                     return self.get_job_result(external_id, context)
-                self.log.info(
-                    "Prior job with identifier: %s in terminal state %s, 
resubmitting fresh",
-                    external_id,
-                    status,
+                self.log.warning(
+                    "Prior job in terminal state, resubmitting fresh",
+                    external_id_key=self.external_id_key,
+                    external_id=external_id,
+                    status=status,
+                )
+            else:
+                self.log.debug(
+                    "No stored external ID found; submitting fresh job",
+                    external_id_key=self.external_id_key,
                 )
 
         external_id = self.submit_job(context)
 
         if task_store is not None and external_id is not None:
             task_store.set(self.external_id_key, external_id)
+            self.log.debug(
+                "Persisted external ID to task store",
+                external_id_key=self.external_id_key,
+                external_id=external_id,
+            )
 
         self.poll_until_complete(external_id, context)
         return self.get_job_result(external_id, context)
diff --git a/task-sdk/tests/task_sdk/bases/test_resumablemixin.py 
b/task-sdk/tests/task_sdk/bases/test_resumablemixin.py
index 999d041f07a..fc9c21609c1 100644
--- a/task-sdk/tests/task_sdk/bases/test_resumablemixin.py
+++ b/task-sdk/tests/task_sdk/bases/test_resumablemixin.py
@@ -19,6 +19,7 @@ from __future__ import annotations
 from typing import TYPE_CHECKING
 
 import pytest
+import structlog.testing
 
 from airflow.sdk import ResumableJobMixin
 from airflow.sdk.bases.operator import BaseOperator
@@ -197,3 +198,32 @@ class TestExternalIdKey:
         op.execute_resumable(make_context(task_state))
 
         assert task_state.get("my_custom_key") == "job-001"
+
+
+class TestLogging:
+    def test_warning_when_task_store_unavailable(self):
+        op = ConcreteResumableOperator(task_id="test_task")
+        with structlog.testing.capture_logs() as logs:
+            op.execute_resumable(make_context(task_store=None))
+        warnings = [entry for entry in logs if entry["log_level"] == "warning"]
+        assert any("crash recovery is disabled" in entry["event"] for entry in 
warnings)
+
+    @pytest.mark.parametrize(
+        ("status", "event_fragment", "log_level", "extra_fields"),
+        [
+            ("RUNNING", "Reconnecting", "info", {"external_id_key": 
"test_job_id", "status": "RUNNING"}),
+            ("SUCCEEDED", "already completed", "info", {"external_id_key": 
"test_job_id"}),
+            ("FAILED", "terminal state", "warning", {"status": "FAILED"}),
+        ],
+    )
+    def test_log_fields_for_stored_job(self, status, event_fragment, 
log_level, extra_fields):
+        op = ConcreteResumableOperator(task_id="test_task")
+        op._status_map["job-001"] = status
+        with structlog.testing.capture_logs() as logs:
+            op.execute_resumable(make_context(FakeTaskState({"test_job_id": 
"job-001"})))
+        entry = next((e for e in logs if event_fragment in e["event"]), None)
+        assert entry is not None
+        assert entry["log_level"] == log_level
+        assert entry["external_id"] == "job-001"
+        for key, val in extra_fields.items():
+            assert entry[key] == val

Reply via email to