Copilot commented on code in PR #67572:
URL: https://github.com/apache/airflow/pull/67572#discussion_r3306538333
##########
airflow-core/tests/unit/models/test_serialized_dag.py:
##########
@@ -912,3 +912,27 @@ def
test_deadline_name_change_updates_db_and_returns_true(self, testing_dag_bund
# The name must have been updated in the DB.
assert updated_alert.name == "updated name"
+
+ def test_get_count_returns_zero_on_empty_table(self, session):
+ """get_count() returns 0 when no serialized DAGs are stored."""
Review Comment:
This test assumes the `serialized_dag` table is empty but doesn’t actively
ensure it (e.g., by clearing the table within the test). To make the test
order-independent and more robust across fixture changes, consider explicitly
deleting/truncating `SerializedDagModel` rows at the start of the test (or
using a fixture that guarantees an empty table for this model).
##########
airflow-core/src/airflow/models/serialized_dag.py:
##########
@@ -875,6 +875,21 @@ def has_dag(cls, dag_id: str, session: Session =
NEW_SESSION) -> bool:
"""
return session.scalar(select(literal(True)).where(cls.dag_id ==
dag_id).limit(1)) is not None
+ @classmethod
+ @provide_session
+ def get_count(cls, session: Session = NEW_SESSION) -> int:
+ """
+ Return the total number of serialized DAGs stored in the database.
+
+ Uses ``scalar_one()`` so that a DB connectivity failure surfaces as a
+ ``SQLAlchemyError`` rather than a silent ``None`` return. ``COUNT(*)``
+ always produces exactly one row, so ``NoResultFound`` is never raised
+ under normal conditions.
Review Comment:
The docstring rationale is misleading: DB connectivity failures will raise
regardless of `scalar()` vs `scalar_one()`, so the “silent `None` return” claim
isn’t accurate here. Consider updating the docstring to justify `scalar_one()`
in terms of enforcing exactly-one-row semantics (and raising if the query shape
changes unexpectedly), rather than connectivity behavior.
##########
airflow-core/src/airflow/dag_processing/manager.py:
##########
@@ -1566,6 +1571,16 @@ def emit_metrics(*, parse_time: float, dag_file_stats:
Sequence[DagFileStat]):
stats.gauge("dag_processing.total_parse_time", parse_time)
stats.gauge("dagbag_size", sum(stat.num_dags for stat in dag_file_stats))
stats.gauge("dag_processing.import_errors", sum(stat.import_errors for
stat in dag_file_stats))
+ # COUNT(*) on the serialized_dag table adds one DB round-trip per parse
loop.
+ # This can be expensive on large deployments (query plan is DB-dependent
and
+ # may involve a full table scan). The call is isolated so that a transient
+ # DB error never kills the parse loop; throttling this metric in the future
+ # is a straightforward follow-up if the round-trip becomes a bottleneck.
+ try:
+ with create_session() as session:
+ stats.gauge("serialized_dag.count",
SerializedDagModel.get_count(session=session))
+ except SQLAlchemyError:
+ log.exception("Failed to emit serialized_dag.count metric")
Review Comment:
Catching `SQLAlchemyError` is very broad and can also swallow non-transient
issues (e.g., programming/config/schema errors) that you might prefer to
surface/test-fail rather than silently skip emitting the metric. Consider
narrowing the exception to DB connectivity/DBAPI-related errors (e.g.,
`DBAPIError`/`OperationalError`) if the intent is specifically “don’t fail the
parse loop on transient DB failures,” while still letting unexpected SQLAlchemy
usage errors be noticed earlier.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]