Copilot commented on code in PR #67572:
URL: https://github.com/apache/airflow/pull/67572#discussion_r3306672622
##########
airflow-core/src/airflow/dag_processing/manager.py:
##########
@@ -1566,6 +1571,16 @@ def emit_metrics(*, parse_time: float, dag_file_stats:
Sequence[DagFileStat]):
stats.gauge("dag_processing.total_parse_time", parse_time)
stats.gauge("dagbag_size", sum(stat.num_dags for stat in dag_file_stats))
stats.gauge("dag_processing.import_errors", sum(stat.import_errors for
stat in dag_file_stats))
+ # Gated by config so large deployments can opt out of the extra DB
round-trip.
+ # On failure an error counter is incremented so dashboards can alert on
+ # missing samples rather than silently showing a stale last value.
+ if conf.getboolean("scheduler", "emit_serialized_dag_count_metric",
fallback=True):
+ try:
+ with create_session() as session:
+ stats.gauge("serialized_dag.count",
SerializedDagModel.get_count(session=session))
+ except SQLAlchemyError:
+ log.exception("Failed to emit serialized_dag.count metric")
+ stats.incr("serialized_dag.count_error")
Review Comment:
The new config-gated branch isn’t covered: there should be a unit test
asserting that when `conf.getboolean(...)` returns False,
`SerializedDagModel.get_count` is not called and no `serialized_dag.count`
gauge is emitted. This prevents regressions where the setting is ignored or
accidentally inverted.
##########
airflow-core/tests/unit/dag_processing/test_manager.py:
##########
@@ -2790,3 +2790,38 @@ def test_bundle_version_data_stored_after_refresh(self,
session):
assert manager._bundle_versions["mock_bundle"] == "newhash"
assert manager._bundle_version_data["mock_bundle"] == test_data
+
+
+class TestEmitMetrics:
+ """Tests for the emit_metrics module-level function."""
+
+ def test_emit_metrics_emits_serialized_dag_count(self):
+ """emit_metrics emits the serialized_dag.count gauge with the value
from get_count."""
+ from airflow.dag_processing.manager import emit_metrics
+
+ with mock.patch("airflow.dag_processing.manager.create_session"):
+ with mock.patch(
+ "airflow.dag_processing.manager.SerializedDagModel.get_count",
return_value=3
+ ):
+ with mock.patch("airflow.dag_processing.manager.stats") as
mock_stats:
+ emit_metrics(parse_time=1.0, dag_file_stats=[])
+
+ mock_stats.gauge.assert_any_call("serialized_dag.count", 3)
+
+ def test_emit_metrics_logs_and_swallows_db_error(self):
+ """emit_metrics logs, increments error counter, and swallows
SQLAlchemyError from get_count."""
+ from sqlalchemy.exc import SQLAlchemyError
+
+ from airflow.dag_processing.manager import emit_metrics
+
+ with mock.patch("airflow.dag_processing.manager.create_session"):
+ with mock.patch(
+ "airflow.dag_processing.manager.SerializedDagModel.get_count",
+ side_effect=SQLAlchemyError("db failure"),
+ ):
+ with mock.patch("airflow.dag_processing.manager.stats") as
mock_stats:
+ with mock.patch("airflow.dag_processing.manager.log") as
mock_log:
+ emit_metrics(parse_time=1.0, dag_file_stats=[])
Review Comment:
These tests assume the runtime config leaves
`scheduler.emit_serialized_dag_count_metric` enabled (fallback is True, but an
explicit config in the test environment could disable it), which can make the
assertions flaky. Patch `airflow.dag_processing.manager.conf.getboolean` to
return True in this test (and return False in a dedicated test) so the behavior
under test is deterministic.
##########
airflow-core/tests/unit/models/test_serialized_dag.py:
##########
@@ -912,3 +912,30 @@ def
test_deadline_name_change_updates_db_and_returns_true(self, testing_dag_bund
# The name must have been updated in the DB.
assert updated_alert.name == "updated name"
+
+ def test_get_count_returns_zero_on_empty_table(self, session):
+ """get_count() returns 0 when no serialized DAGs are stored."""
+ db.clear_db_serialized_dags()
+ assert SDM.get_count(session=session) == 0
+
+ def test_get_count_returns_correct_value(self, dag_maker, session):
+ """get_count() returns the exact number of serialized DAGs in the
table."""
+ baseline = SDM.get_count(session=session)
+ with dag_maker("dag_count_1"):
+ pass
+ with dag_maker("dag_count_2"):
+ pass
+ # dag_maker writes SerializedDagModel rows on context exit; flush to
make
+ # them visible within the same session before asserting.
+ session.flush()
+ assert SDM.get_count(session=session) == baseline + 2
+
+ def test_get_count_propagates_db_error(self, session):
+ """get_count() lets OperationalError propagate so callers can handle
DB failures."""
+ from sqlalchemy.exc import OperationalError
+
+ with mock.patch.object(
+ session, "execute", side_effect=OperationalError("db failure",
None, Exception("db failure"))
Review Comment:
Constructing `OperationalError` with `params=None` can be brittle because
some SQLAlchemy error formatting/representation paths expect an empty
dict/tuple. Consider using `{}` (or a realistic params structure) for the
second argument to reduce the chance of the test failing due to error
stringification rather than the intended propagation behavior.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]