This is an automated email from the ASF dual-hosted git repository.
amoghrajesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 6c10f4c1161 Add metrics and traces to `ResumableJobMixin` for crash
recovery (#68213)
6c10f4c1161 is described below
commit 6c10f4c1161cb6e9213849f4db2e734ddf9e5111
Author: Amogh Desai <[email protected]>
AuthorDate: Fri Jun 12 09:47:51 2026 +0530
Add metrics and traces to `ResumableJobMixin` for crash recovery (#68213)
---
.../observability/metrics/metrics_template.yaml | 35 +++++++
.../src/airflow/sdk/bases/resumablejobmixin.py | 101 +++++++++++++------
.../tests/task_sdk/bases/test_resumablemixin.py | 112 +++++++++++++++++++++
3 files changed, 217 insertions(+), 31 deletions(-)
diff --git
a/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml
b/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml
index f4fe52e5298..42e2781dcfa 100644
---
a/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml
+++
b/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml
@@ -59,6 +59,41 @@ metrics:
legacy_name: "operator_successes_{operator_name}"
name_variables: ["operator_name"]
+ - name: "resumable_job.fresh_submit"
+ description: "Number of times a ResumableJobMixin operator submitted a
fresh job
+ with no prior run ID stored (first run). Metric with operator tagging."
+ type: "counter"
+ legacy_name: "-"
+ name_variables: []
+
+ - name: "resumable_job.already_succeeded"
+ description: "Number of times a ResumableJobMixin operator found a stored
run ID
+ whose job had already completed successfully and skipped resubmission.
Metric with operator tagging."
+ type: "counter"
+ legacy_name: "-"
+ name_variables: []
+
+ - name: "resumable_job.terminal_resubmit"
+ description: "Number of times a ResumableJobMixin operator found a stored
run ID
+ whose job was in a terminal (failed) state and submitted a fresh job.
Metric with operator tagging."
+ type: "counter"
+ legacy_name: "-"
+ name_variables: []
+
+ - name: "resumable_job.reconnect_attempt"
+ description: "Number of times a ResumableJobMixin operator found a stored
run ID
+ on retry and attempted to reconnect. Metric with operator tagging."
+ type: "counter"
+ legacy_name: "-"
+ name_variables: []
+
+ - name: "resumable_job.reconnect_success"
+ description: "Number of times a ResumableJobMixin operator successfully
reconnected
+ to an active job on retry. Metric with operator tagging."
+ type: "counter"
+ legacy_name: "-"
+ name_variables: []
+
- name: "ti_failures"
description: "Overall task instances failures. Metric with dag_id and
task_id tagging."
type: "counter"
diff --git a/task-sdk/src/airflow/sdk/bases/resumablejobmixin.py
b/task-sdk/src/airflow/sdk/bases/resumablejobmixin.py
index 5a6e9dad7a4..e6c587fdece 100644
--- a/task-sdk/src/airflow/sdk/bases/resumablejobmixin.py
+++ b/task-sdk/src/airflow/sdk/bases/resumablejobmixin.py
@@ -18,12 +18,18 @@ from __future__ import annotations
from typing import TYPE_CHECKING, Any
+from opentelemetry import trace
+
+from airflow.sdk._shared.observability.metrics import stats
+
if TYPE_CHECKING:
from pydantic import JsonValue
from airflow.sdk.definitions.context import Context
from airflow.sdk.types import Logger
+tracer = trace.get_tracer(__name__)
+
class ResumableJobMixin:
"""
@@ -101,41 +107,74 @@ class ResumableJobMixin:
Closing this window would require atomic "submit + persist", which is
not possible across
an external system boundary.
"""
- task_store = context.get("task_store")
-
- if task_store is None:
- self.log.warning("task_store not available in context, crash
recovery is disabled for this run")
- else:
- external_id = task_store.get(self.external_id_key)
- if external_id:
- status = self.get_job_status(external_id, context)
- if self.is_job_active(status):
- self.log.info(
- "Reconnecting to existing job",
- external_id_key=self.external_id_key,
- external_id=external_id,
- status=status,
- )
- return self.poll_until_complete(external_id, context)
- if self.is_job_succeeded(status):
- self.log.info(
- "Job already completed successfully, skipping
resubmission",
- external_id_key=self.external_id_key,
- external_id=external_id,
- )
- return self.get_job_result(external_id, context)
+ operator_tag = {"operator": type(self).__name__}
+ reconnect_to: Any = None
+ already_succeeded_id: Any = None
+
+ with tracer.start_as_current_span("resumable_job.resume_decision") as
span:
+ span.set_attribute("operator", type(self).__name__)
+ span.set_attribute("resumable.external_id_key",
self.external_id_key)
+
+ task_store = context.get("task_store")
+
+ if task_store is None:
+ span.set_attribute("resumable.decision", "no_task_store")
self.log.warning(
- "Prior job in terminal state, resubmitting fresh",
- external_id_key=self.external_id_key,
- external_id=external_id,
- status=status,
+ "task_store not available in context, crash recovery is
disabled for this run"
)
else:
- self.log.debug(
- "No stored external ID found; submitting fresh job",
- external_id_key=self.external_id_key,
- )
+ external_id = task_store.get(self.external_id_key)
+ if external_id:
+ stats.incr("resumable_job.reconnect_attempt",
tags=operator_tag)
+
+ status = self.get_job_status(external_id, context)
+
+ span.set_attribute("resumable.external_id",
str(external_id))
+ span.set_attribute("resumable.prior_status", status)
+
+ if self.is_job_active(status):
+ # Job is still running, skip submission and reconnect
to it.
+ span.set_attribute("resumable.decision", "reconnect")
+ stats.incr("resumable_job.reconnect_success",
tags=operator_tag)
+ self.log.info(
+ "Reconnecting to existing job",
+ external_id_key=self.external_id_key,
+ external_id=external_id,
+ status=status,
+ )
+ reconnect_to = external_id
+ elif self.is_job_succeeded(status):
+ # Job already finished successfully, skip polling and
return result directly.
+ span.set_attribute("resumable.decision",
"already_succeeded")
+ stats.incr("resumable_job.already_succeeded",
tags=operator_tag)
+ self.log.info(
+ "Job already completed successfully, skipping
resubmission",
+ external_id_key=self.external_id_key,
+ external_id=external_id,
+ )
+ already_succeeded_id = external_id
+ else:
+ # Job is in a terminal failed state, fall through and
submit a new job.
+ span.set_attribute("resumable.decision",
"terminal_resubmit")
+ stats.incr("resumable_job.terminal_resubmit",
tags=operator_tag)
+ self.log.warning(
+ "Prior job in terminal state, resubmitting fresh",
+ external_id_key=self.external_id_key,
+ external_id=external_id,
+ status=status,
+ )
+ else:
+ span.set_attribute("resumable.decision", "fresh_submit")
+ stats.incr("resumable_job.fresh_submit", tags=operator_tag)
+ self.log.debug(
+ "No stored external ID found; submitting fresh job",
+ external_id_key=self.external_id_key,
+ )
+ if reconnect_to is not None:
+ return self.poll_until_complete(reconnect_to, context)
+ if already_succeeded_id is not None:
+ return self.get_job_result(already_succeeded_id, context)
external_id = self.submit_job(context)
if task_store is not None and external_id is not None:
diff --git a/task-sdk/tests/task_sdk/bases/test_resumablemixin.py
b/task-sdk/tests/task_sdk/bases/test_resumablemixin.py
index fc9c21609c1..a920f2b5bd1 100644
--- a/task-sdk/tests/task_sdk/bases/test_resumablemixin.py
+++ b/task-sdk/tests/task_sdk/bases/test_resumablemixin.py
@@ -17,9 +17,14 @@
from __future__ import annotations
from typing import TYPE_CHECKING
+from unittest import mock
+from unittest.mock import MagicMock, patch
import pytest
import structlog.testing
+from opentelemetry.sdk.trace import TracerProvider
+from opentelemetry.sdk.trace.export import SimpleSpanProcessor
+from opentelemetry.sdk.trace.export.in_memory_span_exporter import
InMemorySpanExporter
from airflow.sdk import ResumableJobMixin
from airflow.sdk.bases.operator import BaseOperator
@@ -200,6 +205,113 @@ class TestExternalIdKey:
assert task_state.get("my_custom_key") == "job-001"
+class TestMetrics:
+ _PATCH = "airflow.sdk._shared.observability.metrics.stats.incr"
+ _TAG = {"operator": "ConcreteResumableOperator"}
+
+ def test_fresh_submit_fires_only_fresh_submit_counter(self):
+ op = ConcreteResumableOperator(task_id="test_task")
+ mock_incr = MagicMock()
+ with patch(self._PATCH, mock_incr):
+ op.execute_resumable(make_context(FakeTaskState()))
+ called_names = [call.args[0] for call in mock_incr.call_args_list]
+ assert called_names == ["resumable_job.fresh_submit"]
+ mock_incr.assert_called_once_with("resumable_job.fresh_submit",
tags=self._TAG)
+
+ def test_reconnect_fires_attempt_and_success(self):
+ op = ConcreteResumableOperator(task_id="test_task")
+ op._status_map["job-001"] = "RUNNING"
+ mock_incr = MagicMock()
+ with patch(self._PATCH, mock_incr):
+ op.execute_resumable(make_context(FakeTaskState({"test_job_id":
"job-001"})))
+ called_names = [call.args[0] for call in mock_incr.call_args_list]
+ assert "resumable_job.reconnect_attempt" in called_names
+ assert "resumable_job.reconnect_success" in called_names
+ assert "resumable_job.fresh_submit" not in called_names
+
+ def test_already_succeeded_fires_when_job_succeeded(self):
+ op = ConcreteResumableOperator(task_id="test_task")
+ op._status_map["job-001"] = "SUCCEEDED"
+ mock_incr = MagicMock()
+ with patch(self._PATCH, mock_incr):
+ op.execute_resumable(make_context(FakeTaskState({"test_job_id":
"job-001"})))
+ called_names = [call.args[0] for call in mock_incr.call_args_list]
+ assert "resumable_job.reconnect_attempt" in called_names
+ assert "resumable_job.already_succeeded" in called_names
+ assert "resumable_job.reconnect_success" not in called_names
+ assert "resumable_job.fresh_submit" not in called_names
+
+ def test_terminal_resubmit_fires_when_job_failed(self):
+ op = ConcreteResumableOperator(task_id="test_task")
+ op._status_map["job-001"] = "FAILED"
+ mock_incr = MagicMock()
+ with patch(self._PATCH, mock_incr):
+ op.execute_resumable(make_context(FakeTaskState({"test_job_id":
"job-001"})))
+ called_names = [call.args[0] for call in mock_incr.call_args_list]
+ assert "resumable_job.reconnect_attempt" in called_names
+ assert "resumable_job.terminal_resubmit" in called_names
+ assert "resumable_job.reconnect_success" not in called_names
+ assert "resumable_job.fresh_submit" not in called_names
+
+
+class TestTracing:
+ _MODULE_TRACER = "airflow.sdk.bases.resumablejobmixin.tracer"
+
+ def _make_tracing_provider(self) -> tuple[InMemorySpanExporter,
TracerProvider]:
+ exporter = InMemorySpanExporter()
+ provider = TracerProvider()
+ provider.add_span_processor(SimpleSpanProcessor(exporter))
+ return exporter, provider
+
+ def _get_decision_span(self, exporter: InMemorySpanExporter):
+ spans = exporter.get_finished_spans()
+ return next((s for s in spans if s.name ==
"resumable_job.resume_decision"), None)
+
+ def _make_tracer(self):
+ exporter, provider = self._make_tracing_provider()
+ return exporter,
provider.get_tracer("airflow.sdk.bases.resumablejobmixin")
+
+ def test_fresh_submit_span_attributes(self):
+ op = ConcreteResumableOperator(task_id="test_task")
+ exporter, module_tracer = self._make_tracer()
+ with mock.patch(self._MODULE_TRACER, module_tracer):
+ op.execute_resumable(make_context(FakeTaskState()))
+ span = self._get_decision_span(exporter)
+ assert span is not None
+ assert span.attributes["resumable.decision"] == "fresh_submit"
+ assert span.attributes["operator"] == "ConcreteResumableOperator"
+ assert span.attributes["resumable.external_id_key"] == "test_job_id"
+ assert "resumable.external_id" not in span.attributes
+
+ def test_reconnect_span_attributes(self):
+ op = ConcreteResumableOperator(task_id="test_task")
+ op._status_map["job-001"] = "RUNNING"
+ exporter, module_tracer = self._make_tracer()
+ with mock.patch(self._MODULE_TRACER, module_tracer):
+ op.execute_resumable(make_context(FakeTaskState({"test_job_id":
"job-001"})))
+ span = self._get_decision_span(exporter)
+ assert span is not None
+ assert span.attributes["resumable.decision"] == "reconnect"
+ assert span.attributes["resumable.external_id"] == "job-001"
+ assert span.attributes["resumable.prior_status"] == "RUNNING"
+
+ @pytest.mark.parametrize(
+ ("status", "expected_decision"),
+ [("SUCCEEDED", "already_succeeded"), ("FAILED", "terminal_resubmit")],
+ )
+ def test_non_active_stored_job_span_attributes(self, status,
expected_decision):
+ op = ConcreteResumableOperator(task_id="test_task")
+ op._status_map["job-001"] = status
+ exporter, module_tracer = self._make_tracer()
+ with mock.patch(self._MODULE_TRACER, module_tracer):
+ op.execute_resumable(make_context(FakeTaskState({"test_job_id":
"job-001"})))
+ span = self._get_decision_span(exporter)
+ assert span is not None
+ assert span.attributes["resumable.decision"] == expected_decision
+ assert span.attributes["resumable.external_id"] == "job-001"
+ assert span.attributes["resumable.prior_status"] == status
+
+
class TestLogging:
def test_warning_when_task_store_unavailable(self):
op = ConcreteResumableOperator(task_id="test_task")