Copilot commented on code in PR #67572:
URL: https://github.com/apache/airflow/pull/67572#discussion_r3306644821
##########
airflow-core/src/airflow/dag_processing/manager.py:
##########
@@ -1566,6 +1571,17 @@ def emit_metrics(*, parse_time: float, dag_file_stats:
Sequence[DagFileStat]):
stats.gauge("dag_processing.total_parse_time", parse_time)
stats.gauge("dagbag_size", sum(stat.num_dags for stat in dag_file_stats))
stats.gauge("dag_processing.import_errors", sum(stat.import_errors for
stat in dag_file_stats))
+ # COUNT(*) on the serialized_dag table adds one DB round-trip per parse
loop.
+ # This can be expensive on large deployments; throttling this metric is a
+ # straightforward follow-up if the round-trip becomes a bottleneck.
+ # On failure, an error counter is incremented so dashboards can alert on
+ # missing samples rather than silently showing a stale last value.
+ try:
+ with create_session() as session:
+ stats.gauge("serialized_dag.count",
SerializedDagModel.get_count(session=session))
+ except SQLAlchemyError:
+ log.exception("Failed to emit serialized_dag.count metric")
+ stats.incr("serialized_dag.count_error")
Review Comment:
This introduces a new `COUNT(*)` query and session creation on every parse
loop iteration, which can become a measurable overhead at scale. Consider
throttling/sampling (e.g., emit every N loops or every T seconds), or making it
configurable so large deployments can disable/reduce the query cost while still
keeping the metric available when desired.
##########
airflow-core/tests/unit/models/test_serialized_dag.py:
##########
@@ -912,3 +912,28 @@ def
test_deadline_name_change_updates_db_and_returns_true(self, testing_dag_bund
# The name must have been updated in the DB.
assert updated_alert.name == "updated name"
+
+ def test_get_count_returns_zero_on_empty_table(self, session):
+ """get_count() returns 0 when no serialized DAGs are stored."""
+ db.clear_db_serialized_dags()
+ assert SDM.get_count(session=session) == 0
+
+ def test_get_count_returns_correct_value(self, dag_maker, session):
+ """get_count() returns the exact number of serialized DAGs in the
table."""
+ baseline = SDM.get_count(session=session)
+ with dag_maker("dag_count_1"):
+ pass
+ with dag_maker("dag_count_2"):
+ pass
+ # dag_maker writes SerializedDagModel rows on context exit; flush to
make
+ # them visible within the same session before asserting.
+ session.flush()
+ assert SDM.get_count(session=session) == baseline + 2
+
+ def test_get_count_propagates_db_error(self, session):
+ """get_count() lets OperationalError propagate so callers can handle
DB failures."""
+ from sqlalchemy.exc import OperationalError
+
+ with mock.patch.object(session, "execute",
side_effect=OperationalError("db failure", None, None)):
Review Comment:
`OperationalError` is typically constructed with a DBAPI `orig` exception;
passing `None` can lead to odd behavior in stringification/inspection in some
SQLAlchemy versions. Prefer providing an `Exception(...)` (or a minimal
DBAPI-like exception instance) as the `orig` argument to better match
real-world error objects.
##########
airflow-core/tests/unit/dag_processing/test_manager.py:
##########
@@ -2790,3 +2790,36 @@ def test_bundle_version_data_stored_after_refresh(self,
session):
assert manager._bundle_versions["mock_bundle"] == "newhash"
assert manager._bundle_version_data["mock_bundle"] == test_data
+
+
+class TestEmitMetrics:
+ """Tests for the emit_metrics module-level function."""
+
+ def test_emit_metrics_emits_serialized_dag_count(self):
+ """emit_metrics emits the serialized_dag.count gauge with the value
from get_count."""
+ from airflow.dag_processing.manager import emit_metrics
+
+ with mock.patch(
+ "airflow.dag_processing.manager.SerializedDagModel.get_count",
return_value=3
+ ):
+ with mock.patch("airflow.dag_processing.manager.stats") as
mock_stats:
+ emit_metrics(parse_time=1.0, dag_file_stats=[])
Review Comment:
`emit_metrics()` opens a real DB session via `create_session()` even though
this test only cares about metric emission. To keep this a true unit test
(faster and less environment-dependent), patch
`airflow.dag_processing.manager.create_session` with a lightweight context
manager stub so the test doesn’t require DB session creation.
##########
airflow-core/tests/unit/dag_processing/test_manager.py:
##########
@@ -2790,3 +2790,36 @@ def test_bundle_version_data_stored_after_refresh(self,
session):
assert manager._bundle_versions["mock_bundle"] == "newhash"
assert manager._bundle_version_data["mock_bundle"] == test_data
+
+
+class TestEmitMetrics:
+ """Tests for the emit_metrics module-level function."""
+
+ def test_emit_metrics_emits_serialized_dag_count(self):
+ """emit_metrics emits the serialized_dag.count gauge with the value
from get_count."""
+ from airflow.dag_processing.manager import emit_metrics
+
+ with mock.patch(
+ "airflow.dag_processing.manager.SerializedDagModel.get_count",
return_value=3
+ ):
+ with mock.patch("airflow.dag_processing.manager.stats") as
mock_stats:
+ emit_metrics(parse_time=1.0, dag_file_stats=[])
+
+ mock_stats.gauge.assert_any_call("serialized_dag.count", 3)
+
+ def test_emit_metrics_logs_and_swallows_db_error(self):
+ """emit_metrics logs, increments error counter, and swallows
SQLAlchemyError from get_count."""
+ from sqlalchemy.exc import SQLAlchemyError
+
+ from airflow.dag_processing.manager import emit_metrics
+
+ with mock.patch(
+ "airflow.dag_processing.manager.SerializedDagModel.get_count",
+ side_effect=SQLAlchemyError("db failure"),
+ ):
+ with mock.patch("airflow.dag_processing.manager.stats") as
mock_stats:
+ with mock.patch("airflow.dag_processing.manager.log") as
mock_log:
+ emit_metrics(parse_time=1.0, dag_file_stats=[])
Review Comment:
Same as above: this test will still create a real session via
`create_session()` before `get_count()` raises. Patch `create_session` to a
stub context manager so the test isolates the error-handling path without
relying on DB session creation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]