This is an automated email from the ASF dual-hosted git repository.

amoghrajesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6c10f4c1161 Add metrics and traces to `ResumableJobMixin` for crash 
recovery (#68213)
6c10f4c1161 is described below

commit 6c10f4c1161cb6e9213849f4db2e734ddf9e5111
Author: Amogh Desai <[email protected]>
AuthorDate: Fri Jun 12 09:47:51 2026 +0530

    Add metrics and traces to `ResumableJobMixin` for crash recovery (#68213)
---
 .../observability/metrics/metrics_template.yaml    |  35 +++++++
 .../src/airflow/sdk/bases/resumablejobmixin.py     | 101 +++++++++++++------
 .../tests/task_sdk/bases/test_resumablemixin.py    | 112 +++++++++++++++++++++
 3 files changed, 217 insertions(+), 31 deletions(-)

diff --git 
a/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml
 
b/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml
index f4fe52e5298..42e2781dcfa 100644
--- 
a/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml
+++ 
b/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml
@@ -59,6 +59,41 @@ metrics:
     legacy_name: "operator_successes_{operator_name}"
     name_variables: ["operator_name"]
 
+  - name: "resumable_job.fresh_submit"
+    description: "Number of times a ResumableJobMixin operator submitted a 
fresh job
+    with no prior run ID stored (first run). Metric with operator tagging."
+    type: "counter"
+    legacy_name: "-"
+    name_variables: []
+
+  - name: "resumable_job.already_succeeded"
+    description: "Number of times a ResumableJobMixin operator found a stored 
run ID
+    whose job had already completed successfully and skipped resubmission. 
Metric with operator tagging."
+    type: "counter"
+    legacy_name: "-"
+    name_variables: []
+
+  - name: "resumable_job.terminal_resubmit"
+    description: "Number of times a ResumableJobMixin operator found a stored 
run ID
+    whose job was in a terminal (failed) state and submitted a fresh job. 
Metric with operator tagging."
+    type: "counter"
+    legacy_name: "-"
+    name_variables: []
+
+  - name: "resumable_job.reconnect_attempt"
+    description: "Number of times a ResumableJobMixin operator found a stored 
run ID
+    on retry and attempted to reconnect. Metric with operator tagging."
+    type: "counter"
+    legacy_name: "-"
+    name_variables: []
+
+  - name: "resumable_job.reconnect_success"
+    description: "Number of times a ResumableJobMixin operator successfully 
reconnected
+    to an active job on retry. Metric with operator tagging."
+    type: "counter"
+    legacy_name: "-"
+    name_variables: []
+
   - name: "ti_failures"
     description: "Overall task instances failures. Metric with dag_id and 
task_id tagging."
     type: "counter"
diff --git a/task-sdk/src/airflow/sdk/bases/resumablejobmixin.py 
b/task-sdk/src/airflow/sdk/bases/resumablejobmixin.py
index 5a6e9dad7a4..e6c587fdece 100644
--- a/task-sdk/src/airflow/sdk/bases/resumablejobmixin.py
+++ b/task-sdk/src/airflow/sdk/bases/resumablejobmixin.py
@@ -18,12 +18,18 @@ from __future__ import annotations
 
 from typing import TYPE_CHECKING, Any
 
+from opentelemetry import trace
+
+from airflow.sdk._shared.observability.metrics import stats
+
 if TYPE_CHECKING:
     from pydantic import JsonValue
 
     from airflow.sdk.definitions.context import Context
     from airflow.sdk.types import Logger
 
+tracer = trace.get_tracer(__name__)
+
 
 class ResumableJobMixin:
     """
@@ -101,41 +107,74 @@ class ResumableJobMixin:
         Closing this window would require atomic "submit + persist", which is 
not possible across
         an external system boundary.
         """
-        task_store = context.get("task_store")
-
-        if task_store is None:
-            self.log.warning("task_store not available in context, crash 
recovery is disabled for this run")
-        else:
-            external_id = task_store.get(self.external_id_key)
-            if external_id:
-                status = self.get_job_status(external_id, context)
-                if self.is_job_active(status):
-                    self.log.info(
-                        "Reconnecting to existing job",
-                        external_id_key=self.external_id_key,
-                        external_id=external_id,
-                        status=status,
-                    )
-                    return self.poll_until_complete(external_id, context)
-                if self.is_job_succeeded(status):
-                    self.log.info(
-                        "Job already completed successfully, skipping 
resubmission",
-                        external_id_key=self.external_id_key,
-                        external_id=external_id,
-                    )
-                    return self.get_job_result(external_id, context)
+        operator_tag = {"operator": type(self).__name__}
+        reconnect_to: Any = None
+        already_succeeded_id: Any = None
+
+        with tracer.start_as_current_span("resumable_job.resume_decision") as 
span:
+            span.set_attribute("operator", type(self).__name__)
+            span.set_attribute("resumable.external_id_key", 
self.external_id_key)
+
+            task_store = context.get("task_store")
+
+            if task_store is None:
+                span.set_attribute("resumable.decision", "no_task_store")
                 self.log.warning(
-                    "Prior job in terminal state, resubmitting fresh",
-                    external_id_key=self.external_id_key,
-                    external_id=external_id,
-                    status=status,
+                    "task_store not available in context, crash recovery is 
disabled for this run"
                 )
             else:
-                self.log.debug(
-                    "No stored external ID found; submitting fresh job",
-                    external_id_key=self.external_id_key,
-                )
+                external_id = task_store.get(self.external_id_key)
+                if external_id:
+                    stats.incr("resumable_job.reconnect_attempt", 
tags=operator_tag)
+
+                    status = self.get_job_status(external_id, context)
+
+                    span.set_attribute("resumable.external_id", 
str(external_id))
+                    span.set_attribute("resumable.prior_status", status)
+
+                    if self.is_job_active(status):
+                        # Job is still running, skip submission and reconnect 
to it.
+                        span.set_attribute("resumable.decision", "reconnect")
+                        stats.incr("resumable_job.reconnect_success", 
tags=operator_tag)
+                        self.log.info(
+                            "Reconnecting to existing job",
+                            external_id_key=self.external_id_key,
+                            external_id=external_id,
+                            status=status,
+                        )
+                        reconnect_to = external_id
+                    elif self.is_job_succeeded(status):
+                        # Job already finished successfully, skip polling and 
return result directly.
+                        span.set_attribute("resumable.decision", 
"already_succeeded")
+                        stats.incr("resumable_job.already_succeeded", 
tags=operator_tag)
+                        self.log.info(
+                            "Job already completed successfully, skipping 
resubmission",
+                            external_id_key=self.external_id_key,
+                            external_id=external_id,
+                        )
+                        already_succeeded_id = external_id
+                    else:
+                        # Job is in a terminal failed state, fall through and 
submit a new job.
+                        span.set_attribute("resumable.decision", 
"terminal_resubmit")
+                        stats.incr("resumable_job.terminal_resubmit", 
tags=operator_tag)
+                        self.log.warning(
+                            "Prior job in terminal state, resubmitting fresh",
+                            external_id_key=self.external_id_key,
+                            external_id=external_id,
+                            status=status,
+                        )
+                else:
+                    span.set_attribute("resumable.decision", "fresh_submit")
+                    stats.incr("resumable_job.fresh_submit", tags=operator_tag)
+                    self.log.debug(
+                        "No stored external ID found; submitting fresh job",
+                        external_id_key=self.external_id_key,
+                    )
 
+        if reconnect_to is not None:
+            return self.poll_until_complete(reconnect_to, context)
+        if already_succeeded_id is not None:
+            return self.get_job_result(already_succeeded_id, context)
         external_id = self.submit_job(context)
 
         if task_store is not None and external_id is not None:
diff --git a/task-sdk/tests/task_sdk/bases/test_resumablemixin.py 
b/task-sdk/tests/task_sdk/bases/test_resumablemixin.py
index fc9c21609c1..a920f2b5bd1 100644
--- a/task-sdk/tests/task_sdk/bases/test_resumablemixin.py
+++ b/task-sdk/tests/task_sdk/bases/test_resumablemixin.py
@@ -17,9 +17,14 @@
 from __future__ import annotations
 
 from typing import TYPE_CHECKING
+from unittest import mock
+from unittest.mock import MagicMock, patch
 
 import pytest
 import structlog.testing
+from opentelemetry.sdk.trace import TracerProvider
+from opentelemetry.sdk.trace.export import SimpleSpanProcessor
+from opentelemetry.sdk.trace.export.in_memory_span_exporter import 
InMemorySpanExporter
 
 from airflow.sdk import ResumableJobMixin
 from airflow.sdk.bases.operator import BaseOperator
@@ -200,6 +205,113 @@ class TestExternalIdKey:
         assert task_state.get("my_custom_key") == "job-001"
 
 
+class TestMetrics:
+    _PATCH = "airflow.sdk._shared.observability.metrics.stats.incr"
+    _TAG = {"operator": "ConcreteResumableOperator"}
+
+    def test_fresh_submit_fires_only_fresh_submit_counter(self):
+        op = ConcreteResumableOperator(task_id="test_task")
+        mock_incr = MagicMock()
+        with patch(self._PATCH, mock_incr):
+            op.execute_resumable(make_context(FakeTaskState()))
+        called_names = [call.args[0] for call in mock_incr.call_args_list]
+        assert called_names == ["resumable_job.fresh_submit"]
+        mock_incr.assert_called_once_with("resumable_job.fresh_submit", 
tags=self._TAG)
+
+    def test_reconnect_fires_attempt_and_success(self):
+        op = ConcreteResumableOperator(task_id="test_task")
+        op._status_map["job-001"] = "RUNNING"
+        mock_incr = MagicMock()
+        with patch(self._PATCH, mock_incr):
+            op.execute_resumable(make_context(FakeTaskState({"test_job_id": 
"job-001"})))
+        called_names = [call.args[0] for call in mock_incr.call_args_list]
+        assert "resumable_job.reconnect_attempt" in called_names
+        assert "resumable_job.reconnect_success" in called_names
+        assert "resumable_job.fresh_submit" not in called_names
+
+    def test_already_succeeded_fires_when_job_succeeded(self):
+        op = ConcreteResumableOperator(task_id="test_task")
+        op._status_map["job-001"] = "SUCCEEDED"
+        mock_incr = MagicMock()
+        with patch(self._PATCH, mock_incr):
+            op.execute_resumable(make_context(FakeTaskState({"test_job_id": 
"job-001"})))
+        called_names = [call.args[0] for call in mock_incr.call_args_list]
+        assert "resumable_job.reconnect_attempt" in called_names
+        assert "resumable_job.already_succeeded" in called_names
+        assert "resumable_job.reconnect_success" not in called_names
+        assert "resumable_job.fresh_submit" not in called_names
+
+    def test_terminal_resubmit_fires_when_job_failed(self):
+        op = ConcreteResumableOperator(task_id="test_task")
+        op._status_map["job-001"] = "FAILED"
+        mock_incr = MagicMock()
+        with patch(self._PATCH, mock_incr):
+            op.execute_resumable(make_context(FakeTaskState({"test_job_id": 
"job-001"})))
+        called_names = [call.args[0] for call in mock_incr.call_args_list]
+        assert "resumable_job.reconnect_attempt" in called_names
+        assert "resumable_job.terminal_resubmit" in called_names
+        assert "resumable_job.reconnect_success" not in called_names
+        assert "resumable_job.fresh_submit" not in called_names
+
+
+class TestTracing:
+    _MODULE_TRACER = "airflow.sdk.bases.resumablejobmixin.tracer"
+
+    def _make_tracing_provider(self) -> tuple[InMemorySpanExporter, 
TracerProvider]:
+        exporter = InMemorySpanExporter()
+        provider = TracerProvider()
+        provider.add_span_processor(SimpleSpanProcessor(exporter))
+        return exporter, provider
+
+    def _get_decision_span(self, exporter: InMemorySpanExporter):
+        spans = exporter.get_finished_spans()
+        return next((s for s in spans if s.name == 
"resumable_job.resume_decision"), None)
+
+    def _make_tracer(self):
+        exporter, provider = self._make_tracing_provider()
+        return exporter, 
provider.get_tracer("airflow.sdk.bases.resumablejobmixin")
+
+    def test_fresh_submit_span_attributes(self):
+        op = ConcreteResumableOperator(task_id="test_task")
+        exporter, module_tracer = self._make_tracer()
+        with mock.patch(self._MODULE_TRACER, module_tracer):
+            op.execute_resumable(make_context(FakeTaskState()))
+        span = self._get_decision_span(exporter)
+        assert span is not None
+        assert span.attributes["resumable.decision"] == "fresh_submit"
+        assert span.attributes["operator"] == "ConcreteResumableOperator"
+        assert span.attributes["resumable.external_id_key"] == "test_job_id"
+        assert "resumable.external_id" not in span.attributes
+
+    def test_reconnect_span_attributes(self):
+        op = ConcreteResumableOperator(task_id="test_task")
+        op._status_map["job-001"] = "RUNNING"
+        exporter, module_tracer = self._make_tracer()
+        with mock.patch(self._MODULE_TRACER, module_tracer):
+            op.execute_resumable(make_context(FakeTaskState({"test_job_id": 
"job-001"})))
+        span = self._get_decision_span(exporter)
+        assert span is not None
+        assert span.attributes["resumable.decision"] == "reconnect"
+        assert span.attributes["resumable.external_id"] == "job-001"
+        assert span.attributes["resumable.prior_status"] == "RUNNING"
+
+    @pytest.mark.parametrize(
+        ("status", "expected_decision"),
+        [("SUCCEEDED", "already_succeeded"), ("FAILED", "terminal_resubmit")],
+    )
+    def test_non_active_stored_job_span_attributes(self, status, 
expected_decision):
+        op = ConcreteResumableOperator(task_id="test_task")
+        op._status_map["job-001"] = status
+        exporter, module_tracer = self._make_tracer()
+        with mock.patch(self._MODULE_TRACER, module_tracer):
+            op.execute_resumable(make_context(FakeTaskState({"test_job_id": 
"job-001"})))
+        span = self._get_decision_span(exporter)
+        assert span is not None
+        assert span.attributes["resumable.decision"] == expected_decision
+        assert span.attributes["resumable.external_id"] == "job-001"
+        assert span.attributes["resumable.prior_status"] == status
+
+
 class TestLogging:
     def test_warning_when_task_store_unavailable(self):
         op = ConcreteResumableOperator(task_id="test_task")

Reply via email to