Copilot commented on code in PR #67572:
URL: https://github.com/apache/airflow/pull/67572#discussion_r3306406589
##########
airflow-core/src/airflow/dag_processing/manager.py:
##########
@@ -1566,6 +1571,14 @@ def emit_metrics(*, parse_time: float, dag_file_stats:
Sequence[DagFileStat]):
stats.gauge("dag_processing.total_parse_time", parse_time)
stats.gauge("dagbag_size", sum(stat.num_dags for stat in dag_file_stats))
stats.gauge("dag_processing.import_errors", sum(stat.import_errors for
stat in dag_file_stats))
+ # COUNT(*) on the serialized_dag table adds one DB round-trip per parse
loop.
+ # On large installations this is typically fast (index scan on the PK), but
+ # we isolate the call so that a transient DB error never kills the parse
loop.
+ try:
+ with create_session() as session:
+ stats.gauge("serialized_dag.count",
SerializedDagModel.get_count(session=session))
+ except SQLAlchemyError:
+ log.exception("Failed to emit serialized_dag.count metric")
Review Comment:
Emitting `COUNT(*)` every parse loop can become expensive on large
`serialized_dag` tables and may add sustained DB load (often a full table scan
depending on the DB/visibility state). Consider reducing frequency (e.g., only
every N loops / time-based throttle), caching the value, or using an estimated
count mechanism where appropriate to avoid making parsing throughput dependent
on this query.
##########
airflow-core/src/airflow/dag_processing/manager.py:
##########
@@ -1566,6 +1571,14 @@ def emit_metrics(*, parse_time: float, dag_file_stats:
Sequence[DagFileStat]):
stats.gauge("dag_processing.total_parse_time", parse_time)
stats.gauge("dagbag_size", sum(stat.num_dags for stat in dag_file_stats))
stats.gauge("dag_processing.import_errors", sum(stat.import_errors for
stat in dag_file_stats))
+ # COUNT(*) on the serialized_dag table adds one DB round-trip per parse
loop.
+ # On large installations this is typically fast (index scan on the PK), but
+ # we isolate the call so that a transient DB error never kills the parse
loop.
Review Comment:
The comment asserts the count is 'typically fast (index scan on the PK)'.
`COUNT(*)` is not guaranteed to use a PK index scan and often uses a sequential
scan; some engines can use index-only scans under specific conditions. Suggest
rewording to avoid implying a specific query plan (e.g., 'may be fast, but can
be expensive on large tables').
##########
airflow-core/tests/unit/dag_processing/test_manager.py:
##########
@@ -2790,3 +2790,37 @@ def test_bundle_version_data_stored_after_refresh(self,
session):
assert manager._bundle_versions["mock_bundle"] == "newhash"
assert manager._bundle_version_data["mock_bundle"] == test_data
+
+
+class TestEmitMetrics:
+ """Tests for the emit_metrics module-level function."""
+
+ def test_emit_metrics_emits_serialized_dag_count(self):
+ """emit_metrics emits the serialized_dag.count gauge with the value
from get_count."""
+ from airflow.dag_processing.manager import emit_metrics
+
+ with mock.patch(
+ "airflow.dag_processing.manager.SerializedDagModel.get_count",
return_value=3
+ ):
+ with mock.patch("airflow.dag_processing.manager.stats") as
mock_stats:
+ emit_metrics(parse_time=1.0, dag_file_stats=[])
+
+ calls = {call[0][0]: call[0][1] for call in
mock_stats.gauge.call_args_list}
+ assert "serialized_dag.count" in calls
+ assert calls["serialized_dag.count"] == 3
Review Comment:
This assertion pattern can accidentally mask duplicate emissions because it
folds the call list into a dict (later calls overwrite earlier ones). It’s more
precise to assert the expected metric call directly (e.g., assert that `gauge`
was called with `(\"serialized_dag.count\", 3)` among its calls) instead of
post-processing `call_args_list`.
##########
airflow-core/tests/unit/models/test_serialized_dag.py:
##########
@@ -912,3 +912,27 @@ def
test_deadline_name_change_updates_db_and_returns_true(self, testing_dag_bund
# The name must have been updated in the DB.
assert updated_alert.name == "updated name"
+
+ def test_get_count_returns_zero_on_empty_table(self, session):
+ """get_count() returns 0 when no serialized DAGs are stored."""
+ assert SDM.get_count(session=session) == 0
+
+ def test_get_count_returns_correct_value(self, dag_maker, session):
+ """get_count() returns the exact number of serialized DAGs in the
table."""
+ with dag_maker("dag_count_1"):
+ pass
+ with dag_maker("dag_count_2"):
+ pass
+ # dag_maker writes SerializedDagModel rows on context exit; flush to
make
+ # them visible within the same session before asserting.
+ session.flush()
+ count = SDM.get_count(session=session)
+ assert count == 2
Review Comment:
This test assumes the `serialized_dag` table is empty at the start of the
test (`assert count == 2`). If other fixtures/tests insert rows (even
implicitly), this can be flaky. A more robust approach is to record a baseline
(`start = SDM.get_count(...)`) and assert `count == start + 2`, or explicitly
clear the table within the test setup.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]