Copilot commented on code in PR #68068:
URL: https://github.com/apache/airflow/pull/68068#discussion_r3373213634


##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -846,7 +846,12 @@ def test_process_executor_events_ti_requeued(
         ti1.refresh_from_db(session=session)
         assert ti1.state == State.QUEUED
         self.job_runner.executor.callback_sink.send.assert_not_called()
-        mock_stats.incr.assert_not_called()
+        # Only the processed-events counter should have fired across all three 
sub-tests;
+        # no killed_externally mismatch metric should appear.
+        assert all(
+            c.args[0] == "scheduler.executor_events.processed"
+            for c in mock_stats.incr.call_args_list
+        )

Review Comment:
   The assertion uses `all(...)` over `mock_stats.incr.call_args_list` without 
checking that any metric was emitted. If `call_args_list` is empty (e.g. a 
future change short-circuits metric emission), this will still pass and won’t 
catch the regression. Assert that at least one incr call happened, and then 
assert that the only emitted metric name is 
`scheduler.executor_events.processed`.



##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -3131,6 +3144,12 @@ def adopt_or_reset_orphaned_tasks(self, *, session: 
Session = NEW_SESSION) -> in
 
                     stats.incr("scheduler.orphaned_tasks.cleared", 
len(to_reset))
                     stats.incr("scheduler.orphaned_tasks.adopted", 
len(tis_to_adopt_or_reset) - len(to_reset))
+                    if to_reset:
+                        stats.incr(
+                            "scheduler.zombies.detected",
+                            len(to_reset),
+                            tags={"reason": "adopt_failure"},
+                        )
 
                     if to_reset:
                         task_instance_str = "\n\t".join(reset_tis_message)

Review Comment:
   There are two consecutive `if to_reset:` blocks here (one for the new 
metric, one for logging). This duplicates the condition and makes it easier for 
the two branches to drift over time. Combine them into a single `if to_reset:` 
block containing both the metric emission and the log message.



##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -11552,3 +11562,163 @@ def test_reaper_ignores_terminal_states(self, 
scheduler_job_runner_for_connectio
         session.expire_all()
         assert session.get(ConnectionTestRequest, ct_success.id).state == 
ConnectionTestState.SUCCESS
         assert session.get(ConnectionTestRequest, ct_failed.id).state == 
ConnectionTestState.FAILED
+
+
+class TestSchedulerObservabilityMetrics:
+    """Tests for the scheduler observability metrics emitted in 
scheduler_job_runner.py."""
+
+    @pytest.fixture(autouse=True)
+    def per_test(self) -> Generator:
+        _clean_db()
+        self.job_runner: SchedulerJobRunner | None = None
+        yield
+        _clean_db()
+
+    # --- scheduler.loop_exceptions ---
+
+    def test_loop_exceptions_incr_on_scheduler_loop_failure(self):
+        """scheduler.loop_exceptions is emitted with exception_class tag when 
_run_scheduler_loop raises."""
+        scheduler_job = Job()
+        self.job_runner = SchedulerJobRunner(job=scheduler_job)
+
+        with (
+            mock.patch("airflow.jobs.scheduler_job_runner.stats") as 
mock_stats,
+            mock.patch.object(self.job_runner, "register_signals", 
return_value=MagicMock()),
+            mock.patch.object(self.job_runner.executor, "start"),
+            mock.patch.object(self.job_runner.executor, "end"),
+            mock.patch.object(self.job_runner, "_run_scheduler_loop", 
side_effect=RuntimeError("loop crash")),
+            pytest.raises(RuntimeError, match="loop crash"),
+        ):
+            self.job_runner._execute()
+
+        mock_stats.incr.assert_any_call(
+            "scheduler.loop_exceptions", tags={"exception_class": 
"RuntimeError"}
+        )
+
+    def test_loop_exceptions_not_emitted_on_clean_exit(self):
+        """scheduler.loop_exceptions is NOT emitted when _run_scheduler_loop 
returns normally."""
+        scheduler_job = Job()
+        self.job_runner = SchedulerJobRunner(job=scheduler_job)
+
+        with (
+            mock.patch("airflow.jobs.scheduler_job_runner.stats") as 
mock_stats,
+            mock.patch.object(self.job_runner, "register_signals", 
return_value=MagicMock()),
+            mock.patch.object(self.job_runner.executor, "start"),
+            mock.patch.object(self.job_runner.executor, "end"),
+            mock.patch.object(self.job_runner, "_run_scheduler_loop"),
+        ):
+            self.job_runner._execute()
+
+        emitted_names = [c.args[0] for c in mock_stats.incr.call_args_list]
+        assert "scheduler.loop_exceptions" not in emitted_names
+
+    # --- scheduler.executor_events.{batch_size,processed,failed} ---
+
+    def test_executor_events_batch_metrics_emitted_on_success(self):
+        """batch_size gauge and processed counter are emitted via the 
early-return path."""
+        # Empty event buffer → tis_with_right_state is empty → early return 
with num_events=0
+        mock_executor = MagicMock()
+        mock_executor.get_event_buffer.return_value = {}
+
+        with mock.patch("airflow.jobs.scheduler_job_runner.stats") as 
mock_stats:
+            result = SchedulerJobRunner.process_executor_events(
+                executor=mock_executor, job_id=1, 
scheduler_dag_bag=MagicMock(), session=MagicMock()
+            )
+
+        assert result == 0
+        
mock_stats.gauge.assert_called_once_with("scheduler.executor_events.batch_size",
 0)
+        
mock_stats.incr.assert_called_once_with("scheduler.executor_events.processed", 
0)
+
+    def test_executor_events_failed_metric_emitted_on_exception(self):
+        """failed counter is emitted in _process_executor_events when 
process_executor_events raises."""
+        scheduler_job = Job()
+        self.job_runner = SchedulerJobRunner(job=scheduler_job)
+
+        with (
+            mock.patch("airflow.jobs.scheduler_job_runner.stats") as 
mock_stats,
+            mock.patch.object(
+                SchedulerJobRunner, "process_executor_events", 
side_effect=ValueError("executor boom")
+            ),
+            pytest.raises(ValueError, match="executor boom"),
+        ):
+            self.job_runner._process_executor_events(executor=MagicMock(), 
session=MagicMock())
+
+        mock_stats.incr.assert_called_once_with(
+            "scheduler.executor_events.failed", tags={"reason": "ValueError"}
+        )
+        mock_stats.gauge.assert_not_called()
+
+    # --- scheduler.zombies.detected ---
+
+    def test_zombies_detected_heartbeat_timeout_emitted(self):
+        """scheduler.zombies.detected{reason:heartbeat_timeout} is emitted 
when zombies are found."""
+        scheduler_job = Job()
+        self.job_runner = SchedulerJobRunner(job=scheduler_job)
+        fake_tis = [MagicMock(), MagicMock(), MagicMock()]
+
+        mock_session = MagicMock()
+        mock_ctx = MagicMock()
+        mock_ctx.__enter__ = MagicMock(return_value=mock_session)
+        mock_ctx.__exit__ = MagicMock(return_value=False)
+
+        with (
+            mock.patch("airflow.jobs.scheduler_job_runner.stats") as 
mock_stats,
+            mock.patch("airflow.jobs.scheduler_job_runner.create_session", 
return_value=mock_ctx),
+            mock.patch.object(
+                self.job_runner, "_find_task_instances_without_heartbeats", 
return_value=fake_tis
+            ),
+            mock.patch.object(self.job_runner, 
"_purge_task_instances_without_heartbeats"),
+        ):
+            self.job_runner._find_and_purge_task_instances_without_heartbeats()
+
+        mock_stats.incr.assert_called_once_with(
+            "scheduler.zombies.detected", 3, tags={"reason": 
"heartbeat_timeout"}
+        )
+
+    def test_zombies_detected_not_emitted_when_no_heartbeat_timeout(self):
+        """scheduler.zombies.detected is NOT emitted when no zombie task 
instances are found."""
+        scheduler_job = Job()
+        self.job_runner = SchedulerJobRunner(job=scheduler_job)
+
+        mock_session = MagicMock()
+        mock_ctx = MagicMock()
+        mock_ctx.__enter__ = MagicMock(return_value=mock_session)
+        mock_ctx.__exit__ = MagicMock(return_value=False)
+
+        with (
+            mock.patch("airflow.jobs.scheduler_job_runner.stats") as 
mock_stats,
+            mock.patch("airflow.jobs.scheduler_job_runner.create_session", 
return_value=mock_ctx),
+            mock.patch.object(
+                self.job_runner, "_find_task_instances_without_heartbeats", 
return_value=[]
+            ),
+        ):
+            self.job_runner._find_and_purge_task_instances_without_heartbeats()
+
+        mock_stats.incr.assert_not_called()
+
+    def test_zombies_detected_adopt_failure_emitted(self, dag_maker, session):
+        """scheduler.zombies.detected{reason:adopt_failure} is emitted for 
tasks that can't be adopted."""
+        with dag_maker(dag_id="test_zombie_adopt_failure", schedule="@daily"):
+            EmptyOperator(task_id="task1")
+
+        old_job = Job()
+        session.add(old_job)
+        session.commit()
+
+        scheduler_job = Job()
+        self.job_runner = SchedulerJobRunner(job=scheduler_job)

Review Comment:
   This test relies on whatever default executor `SchedulerJobRunner` wires up 
from the test environment configuration. If that default executor ever 
implements successful adoption, `to_reset` may become empty and the test will 
stop exercising the `adopt_failure` metric path. To keep the test 
deterministic, pass an executor that uses the BaseExecutor default 
`try_adopt_task_instances` (returns all TIs → reset/adopt_failure).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to