This is an automated email from the ASF dual-hosted git repository.
amoghrajesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 12a9985bd5f Add better logging to ResumableJobMixin for crash recovery
observability (#68206)
12a9985bd5f is described below
commit 12a9985bd5f13c412456e32c8688f042ff686e36
Author: Amogh Desai <[email protected]>
AuthorDate: Tue Jun 9 12:31:01 2026 +0530
Add better logging to ResumableJobMixin for crash recovery observability
(#68206)
---
.../src/airflow/sdk/bases/resumablejobmixin.py | 33 ++++++++++++++++------
.../tests/task_sdk/bases/test_resumablemixin.py | 30 ++++++++++++++++++++
2 files changed, 55 insertions(+), 8 deletions(-)
diff --git a/task-sdk/src/airflow/sdk/bases/resumablejobmixin.py
b/task-sdk/src/airflow/sdk/bases/resumablejobmixin.py
index 55a561c030d..5a6e9dad7a4 100644
--- a/task-sdk/src/airflow/sdk/bases/resumablejobmixin.py
+++ b/task-sdk/src/airflow/sdk/bases/resumablejobmixin.py
@@ -103,31 +103,48 @@ class ResumableJobMixin:
"""
task_store = context.get("task_store")
- if task_store is not None:
+ if task_store is None:
+ self.log.warning("task_store not available in context, crash
recovery is disabled for this run")
+ else:
external_id = task_store.get(self.external_id_key)
if external_id:
status = self.get_job_status(external_id, context)
if self.is_job_active(status):
self.log.info(
- "Reconnecting to existing job identified by: %s
(status: %s)", external_id, status
+ "Reconnecting to existing job",
+ external_id_key=self.external_id_key,
+ external_id=external_id,
+ status=status,
)
return self.poll_until_complete(external_id, context)
if self.is_job_succeeded(status):
self.log.info(
- "Job with identifier: %s already completed
successfully, skipping resubmission",
- external_id,
+ "Job already completed successfully, skipping
resubmission",
+ external_id_key=self.external_id_key,
+ external_id=external_id,
)
return self.get_job_result(external_id, context)
- self.log.info(
- "Prior job with identifier: %s in terminal state %s,
resubmitting fresh",
- external_id,
- status,
+ self.log.warning(
+ "Prior job in terminal state, resubmitting fresh",
+ external_id_key=self.external_id_key,
+ external_id=external_id,
+ status=status,
+ )
+ else:
+ self.log.debug(
+ "No stored external ID found; submitting fresh job",
+ external_id_key=self.external_id_key,
)
external_id = self.submit_job(context)
if task_store is not None and external_id is not None:
task_store.set(self.external_id_key, external_id)
+ self.log.debug(
+ "Persisted external ID to task store",
+ external_id_key=self.external_id_key,
+ external_id=external_id,
+ )
self.poll_until_complete(external_id, context)
return self.get_job_result(external_id, context)
diff --git a/task-sdk/tests/task_sdk/bases/test_resumablemixin.py
b/task-sdk/tests/task_sdk/bases/test_resumablemixin.py
index 999d041f07a..fc9c21609c1 100644
--- a/task-sdk/tests/task_sdk/bases/test_resumablemixin.py
+++ b/task-sdk/tests/task_sdk/bases/test_resumablemixin.py
@@ -19,6 +19,7 @@ from __future__ import annotations
from typing import TYPE_CHECKING
import pytest
+import structlog.testing
from airflow.sdk import ResumableJobMixin
from airflow.sdk.bases.operator import BaseOperator
@@ -197,3 +198,32 @@ class TestExternalIdKey:
op.execute_resumable(make_context(task_state))
assert task_state.get("my_custom_key") == "job-001"
+
+
+class TestLogging:
+ def test_warning_when_task_store_unavailable(self):
+ op = ConcreteResumableOperator(task_id="test_task")
+ with structlog.testing.capture_logs() as logs:
+ op.execute_resumable(make_context(task_store=None))
+ warnings = [entry for entry in logs if entry["log_level"] == "warning"]
+ assert any("crash recovery is disabled" in entry["event"] for entry in
warnings)
+
+ @pytest.mark.parametrize(
+ ("status", "event_fragment", "log_level", "extra_fields"),
+ [
+ ("RUNNING", "Reconnecting", "info", {"external_id_key":
"test_job_id", "status": "RUNNING"}),
+ ("SUCCEEDED", "already completed", "info", {"external_id_key":
"test_job_id"}),
+ ("FAILED", "terminal state", "warning", {"status": "FAILED"}),
+ ],
+ )
+ def test_log_fields_for_stored_job(self, status, event_fragment,
log_level, extra_fields):
+ op = ConcreteResumableOperator(task_id="test_task")
+ op._status_map["job-001"] = status
+ with structlog.testing.capture_logs() as logs:
+ op.execute_resumable(make_context(FakeTaskState({"test_job_id":
"job-001"})))
+ entry = next((e for e in logs if event_fragment in e["event"]), None)
+ assert entry is not None
+ assert entry["log_level"] == log_level
+ assert entry["external_id"] == "job-001"
+ for key, val in extra_fields.items():
+ assert entry[key] == val