Copilot commented on code in PR #67572:
URL: https://github.com/apache/airflow/pull/67572#discussion_r3306644821


##########
airflow-core/src/airflow/dag_processing/manager.py:
##########
@@ -1566,6 +1571,17 @@ def emit_metrics(*, parse_time: float, dag_file_stats: 
Sequence[DagFileStat]):
     stats.gauge("dag_processing.total_parse_time", parse_time)
     stats.gauge("dagbag_size", sum(stat.num_dags for stat in dag_file_stats))
     stats.gauge("dag_processing.import_errors", sum(stat.import_errors for 
stat in dag_file_stats))
+    # COUNT(*) on the serialized_dag table adds one DB round-trip per parse 
loop.
+    # This can be expensive on large deployments; throttling this metric is a
+    # straightforward follow-up if the round-trip becomes a bottleneck.
+    # On failure, an error counter is incremented so dashboards can alert on
+    # missing samples rather than silently showing a stale last value.
+    try:
+        with create_session() as session:
+            stats.gauge("serialized_dag.count", 
SerializedDagModel.get_count(session=session))
+    except SQLAlchemyError:
+        log.exception("Failed to emit serialized_dag.count metric")
+        stats.incr("serialized_dag.count_error")

Review Comment:
   This introduces a new `COUNT(*)` query and session creation on every parse 
loop iteration, which can become a measurable overhead at scale. Consider 
throttling/sampling (e.g., emit every N loops or every T seconds), or making it 
configurable so large deployments can disable/reduce the query cost while still 
keeping the metric available when desired.



##########
airflow-core/tests/unit/models/test_serialized_dag.py:
##########
@@ -912,3 +912,28 @@ def 
test_deadline_name_change_updates_db_and_returns_true(self, testing_dag_bund
 
         # The name must have been updated in the DB.
         assert updated_alert.name == "updated name"
+
+    def test_get_count_returns_zero_on_empty_table(self, session):
+        """get_count() returns 0 when no serialized DAGs are stored."""
+        db.clear_db_serialized_dags()
+        assert SDM.get_count(session=session) == 0
+
+    def test_get_count_returns_correct_value(self, dag_maker, session):
+        """get_count() returns the exact number of serialized DAGs in the 
table."""
+        baseline = SDM.get_count(session=session)
+        with dag_maker("dag_count_1"):
+            pass
+        with dag_maker("dag_count_2"):
+            pass
+        # dag_maker writes SerializedDagModel rows on context exit; flush to 
make
+        # them visible within the same session before asserting.
+        session.flush()
+        assert SDM.get_count(session=session) == baseline + 2
+
+    def test_get_count_propagates_db_error(self, session):
+        """get_count() lets OperationalError propagate so callers can handle 
DB failures."""
+        from sqlalchemy.exc import OperationalError
+
+        with mock.patch.object(session, "execute", 
side_effect=OperationalError("db failure", None, None)):

Review Comment:
   `OperationalError` is typically constructed with a DBAPI `orig` exception; 
passing `None` can lead to odd behavior in stringification/inspection in some 
SQLAlchemy versions. Prefer providing an `Exception(...)` (or a minimal 
DBAPI-like exception instance) as the `orig` argument to better match 
real-world error objects.
   



##########
airflow-core/tests/unit/dag_processing/test_manager.py:
##########
@@ -2790,3 +2790,36 @@ def test_bundle_version_data_stored_after_refresh(self, 
session):
 
         assert manager._bundle_versions["mock_bundle"] == "newhash"
         assert manager._bundle_version_data["mock_bundle"] == test_data
+
+
+class TestEmitMetrics:
+    """Tests for the emit_metrics module-level function."""
+
+    def test_emit_metrics_emits_serialized_dag_count(self):
+        """emit_metrics emits the serialized_dag.count gauge with the value 
from get_count."""
+        from airflow.dag_processing.manager import emit_metrics
+
+        with mock.patch(
+            "airflow.dag_processing.manager.SerializedDagModel.get_count", 
return_value=3
+        ):
+            with mock.patch("airflow.dag_processing.manager.stats") as 
mock_stats:
+                emit_metrics(parse_time=1.0, dag_file_stats=[])

Review Comment:
   `emit_metrics()` opens a real DB session via `create_session()` even though 
this test only cares about metric emission. To keep this a true unit test 
(faster and less environment-dependent), patch 
`airflow.dag_processing.manager.create_session` with a lightweight context 
manager stub so the test doesn’t require DB session creation.



##########
airflow-core/tests/unit/dag_processing/test_manager.py:
##########
@@ -2790,3 +2790,36 @@ def test_bundle_version_data_stored_after_refresh(self, 
session):
 
         assert manager._bundle_versions["mock_bundle"] == "newhash"
         assert manager._bundle_version_data["mock_bundle"] == test_data
+
+
+class TestEmitMetrics:
+    """Tests for the emit_metrics module-level function."""
+
+    def test_emit_metrics_emits_serialized_dag_count(self):
+        """emit_metrics emits the serialized_dag.count gauge with the value 
from get_count."""
+        from airflow.dag_processing.manager import emit_metrics
+
+        with mock.patch(
+            "airflow.dag_processing.manager.SerializedDagModel.get_count", 
return_value=3
+        ):
+            with mock.patch("airflow.dag_processing.manager.stats") as 
mock_stats:
+                emit_metrics(parse_time=1.0, dag_file_stats=[])
+
+        mock_stats.gauge.assert_any_call("serialized_dag.count", 3)
+
+    def test_emit_metrics_logs_and_swallows_db_error(self):
+        """emit_metrics logs, increments error counter, and swallows 
SQLAlchemyError from get_count."""
+        from sqlalchemy.exc import SQLAlchemyError
+
+        from airflow.dag_processing.manager import emit_metrics
+
+        with mock.patch(
+            "airflow.dag_processing.manager.SerializedDagModel.get_count",
+            side_effect=SQLAlchemyError("db failure"),
+        ):
+            with mock.patch("airflow.dag_processing.manager.stats") as 
mock_stats:
+                with mock.patch("airflow.dag_processing.manager.log") as 
mock_log:
+                    emit_metrics(parse_time=1.0, dag_file_stats=[])

Review Comment:
   Same as above: this test will still create a real session via 
`create_session()` before `get_count()` raises. Patch `create_session` to a 
stub context manager so the test isolates the error-handling path without 
relying on DB session creation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to