Copilot commented on code in PR #67572:
URL: https://github.com/apache/airflow/pull/67572#discussion_r3306338748
##########
airflow-core/src/airflow/dag_processing/manager.py:
##########
@@ -1566,6 +1570,11 @@ def emit_metrics(*, parse_time: float, dag_file_stats:
Sequence[DagFileStat]):
stats.gauge("dag_processing.total_parse_time", parse_time)
stats.gauge("dagbag_size", sum(stat.num_dags for stat in dag_file_stats))
stats.gauge("dag_processing.import_errors", sum(stat.import_errors for
stat in dag_file_stats))
+ try:
+ with create_session() as session:
+ stats.gauge("serialized_dag.count",
SerializedDagModel.get_count(session=session))
+ except Exception:
+ log.exception("Failed to emit serialized_dag.count metric")
Review Comment:
Catching `Exception` here is overly broad and can mask non-DB
programming/configuration errors inside `emit_metrics()` (including failures in
`stats.gauge`). Since the intended behavior (per tests/docstrings) is to
swallow DB/count failures, narrow this to the expected exception(s) (e.g.,
`RuntimeError` from `get_count`, and optionally the relevant SQLAlchemy
exception type) and let unexpected exceptions propagate.
##########
airflow-core/tests/unit/dag_processing/test_manager.py:
##########
@@ -2790,3 +2790,33 @@ def test_bundle_version_data_stored_after_refresh(self,
session):
assert manager._bundle_versions["mock_bundle"] == "newhash"
assert manager._bundle_version_data["mock_bundle"] == test_data
+
+
+class TestEmitMetrics:
+ """Tests for the emit_metrics module-level function."""
+
+ @pytest.mark.db_test
+ def test_emit_metrics_emits_serialized_dag_count(self, dag_maker, session):
+ """emit_metrics emits the serialized_dag.count gauge with the DB
count."""
+ from airflow.dag_processing.manager import emit_metrics
+
+ with dag_maker("emit_count_dag"):
+ pass
+
+ with mock.patch("airflow.dag_processing.manager.stats") as mock_stats:
+ emit_metrics(parse_time=1.0, dag_file_stats=[])
+
+ calls = {call[0][0]: call[0][1] for call in
mock_stats.gauge.call_args_list}
+ assert "serialized_dag.count" in calls
+ assert calls["serialized_dag.count"] == 1
Review Comment:
Similar to the `get_count()` unit test, this asserts `serialized_dag.count
== 1` after creating a DAG with `dag_maker`, but `emit_metrics()` queries the
`serialized_dag` table. If the test setup doesn’t actually serialize and
persist the DAG into `serialized_dag`, the gauge value will be `0` and the
assertion will fail. Make the test deterministic by explicitly writing a
`SerializedDagModel` row for the created DAG (or inserting one directly) before
calling `emit_metrics()`.
##########
airflow-core/tests/unit/models/test_serialized_dag.py:
##########
@@ -912,3 +912,22 @@ def
test_deadline_name_change_updates_db_and_returns_true(self, testing_dag_bund
# The name must have been updated in the DB.
assert updated_alert.name == "updated name"
+
+ def test_get_count_returns_zero_on_empty_table(self, session):
+ """get_count() returns 0 when no serialized DAGs are stored."""
+ assert SDM.get_count(session=session) == 0
+
+ def test_get_count_returns_correct_value(self, dag_maker, session):
+ """get_count() returns the exact number of serialized DAGs in the
table."""
+ with dag_maker("dag_count_1"):
+ pass
+ with dag_maker("dag_count_2"):
+ pass
Review Comment:
This test creates DAGs via `dag_maker`, but `SerializedDagModel.get_count()`
counts rows in the `serialized_dag` table. Unless `dag_maker` (in this test
environment) also writes `SerializedDagModel` rows, the expected count of `2`
is not guaranteed and may be `0`, making the test incorrect/flaky. To make the
test deterministic, explicitly create serialized DAG entries (e.g., call the
appropriate `SerializedDagModel` write method for each created DAG, or insert
rows directly) and flush/commit before asserting the count.
##########
airflow-core/src/airflow/models/serialized_dag.py:
##########
@@ -875,6 +875,23 @@ def has_dag(cls, dag_id: str, session: Session =
NEW_SESSION) -> bool:
"""
return session.scalar(select(literal(True)).where(cls.dag_id ==
dag_id).limit(1)) is not None
+ @classmethod
+ @provide_session
+ def get_count(cls, session: Session = NEW_SESSION) -> int:
+ """
+ Return the total number of serialized DAGs stored in the database.
+
+ :param session: ORM Session
+ :raises RuntimeError: if the database returns None for the COUNT
query, which indicates
+ a transient connectivity issue rather than an empty table (COUNT
always returns an int).
+ """
+ result = session.scalar(select(func.count()).select_from(cls))
+ if result is None:
+ raise RuntimeError(
+ "COUNT query on serialized_dag returned None — possible
database connectivity issue"
Review Comment:
The error message contains a Unicode em dash (`—`). To avoid
encoding/rendering issues in some logging/terminal environments and to keep
messages consistent/easy to grep, consider using plain ASCII punctuation (e.g.,
`-`) in this exception string.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]