xBis7 commented on code in PR #68906:
URL: https://github.com/apache/airflow/pull/68906#discussion_r3466417764


##########
airflow-core/tests/unit/models/test_serialized_dag.py:
##########
@@ -201,6 +204,76 @@ def test_serialized_dag_is_updated_if_dag_is_changed(self, 
testing_dag_bundle):
         assert s_dag_2.data["dag"]["tags"] == ["example", "new_tag", "params"]
         assert dag_updated is True
 
+    def test_serialization_metric_incremented_on_new_write(self, 
testing_dag_bundle):
+        """A brand new serialized DAG write emits the ``dag.serialization`` 
metric."""
+        dag = 
make_example_dags(example_dags_module).get("example_params_trigger_ui")
+        with mock.patch(self.SERIALIZED_DAG_STATS) as mock_stats:
+            assert SDM.write_dag(LazyDeserializedDAG.from_dag(dag), 
bundle_name="testing") is True
+
+        mock_stats.incr.assert_called_once_with(
+            "dag.serialization",
+            tags={"dag_id": dag.dag_id, "bundle_name": "testing"},
+        )
+
+    def test_serialization_metric_not_incremented_when_unchanged(self, 
testing_dag_bundle):
+        """Re-writing an unchanged DAG must not emit the ``dag.serialization`` 
metric."""
+        dag = 
make_example_dags(example_dags_module).get("example_params_trigger_ui")
+        assert SDM.write_dag(LazyDeserializedDAG.from_dag(dag), 
bundle_name="testing") is True
+
+        with mock.patch(self.SERIALIZED_DAG_STATS) as mock_stats:
+            assert SDM.write_dag(LazyDeserializedDAG.from_dag(dag), 
bundle_name="testing") is False
+
+        mock_stats.incr.assert_not_called()
+
+    def test_serialization_metric_incremented_on_inplace_update(self, 
dag_maker, session):
+        """Updating a DAG version in place (no dag runs) emits the metric 
once."""
+        with dag_maker("metric_dag") as dag:
+            PythonOperator(task_id="task1", python_callable=lambda: None)
+        # Change the DAG so the hash differs; with no dag runs this updates in 
place.
+        PythonOperator(task_id="task2", python_callable=lambda: None, dag=dag)
+
+        with mock.patch(self.SERIALIZED_DAG_STATS) as mock_stats:
+            assert SDM.write_dag(LazyDeserializedDAG.from_dag(dag), 
bundle_name="dag_maker") is True

Review Comment:
   Half of the tests are setting `bundle_name` as `testing` and the other half 
as `dag_maker`. This isn't an issue but it's a bit confusing because someone 
might think that the context changes, but I don't think that's the case.



##########
airflow-core/tests/unit/models/test_serialized_dag.py:
##########
@@ -201,6 +204,76 @@ def test_serialized_dag_is_updated_if_dag_is_changed(self, 
testing_dag_bundle):
         assert s_dag_2.data["dag"]["tags"] == ["example", "new_tag", "params"]
         assert dag_updated is True
 
+    def test_serialization_metric_incremented_on_new_write(self, 
testing_dag_bundle):
+        """A brand new serialized DAG write emits the ``dag.serialization`` 
metric."""
+        dag = 
make_example_dags(example_dags_module).get("example_params_trigger_ui")
+        with mock.patch(self.SERIALIZED_DAG_STATS) as mock_stats:
+            assert SDM.write_dag(LazyDeserializedDAG.from_dag(dag), 
bundle_name="testing") is True
+
+        mock_stats.incr.assert_called_once_with(
+            "dag.serialization",
+            tags={"dag_id": dag.dag_id, "bundle_name": "testing"},
+        )
+
+    def test_serialization_metric_not_incremented_when_unchanged(self, 
testing_dag_bundle):
+        """Re-writing an unchanged DAG must not emit the ``dag.serialization`` 
metric."""
+        dag = 
make_example_dags(example_dags_module).get("example_params_trigger_ui")
+        assert SDM.write_dag(LazyDeserializedDAG.from_dag(dag), 
bundle_name="testing") is True
+
+        with mock.patch(self.SERIALIZED_DAG_STATS) as mock_stats:
+            assert SDM.write_dag(LazyDeserializedDAG.from_dag(dag), 
bundle_name="testing") is False
+
+        mock_stats.incr.assert_not_called()
+
+    def test_serialization_metric_incremented_on_inplace_update(self, 
dag_maker, session):
+        """Updating a DAG version in place (no dag runs) emits the metric 
once."""
+        with dag_maker("metric_dag") as dag:
+            PythonOperator(task_id="task1", python_callable=lambda: None)
+        # Change the DAG so the hash differs; with no dag runs this updates in 
place.
+        PythonOperator(task_id="task2", python_callable=lambda: None, dag=dag)
+
+        with mock.patch(self.SERIALIZED_DAG_STATS) as mock_stats:
+            assert SDM.write_dag(LazyDeserializedDAG.from_dag(dag), 
bundle_name="dag_maker") is True
+
+        assert session.scalar(select(func.count()).select_from(DagVersion)) == 
1
+        mock_stats.incr.assert_called_once_with(
+            "dag.serialization",
+            tags={"dag_id": "metric_dag", "bundle_name": "dag_maker"},
+        )
+
+    def test_serialization_metric_incremented_on_new_version(self, dag_maker, 
session):
+        """Writing a new DAG version (existing run) emits the metric once."""
+        with dag_maker("metric_dag") as dag:
+            PythonOperator(task_id="task1", python_callable=lambda: None)
+        dag_maker.create_dagrun(run_id="run1", 
logical_date=pendulum.datetime(2025, 1, 1))
+        PythonOperator(task_id="task2", python_callable=lambda: None, dag=dag)
+
+        with mock.patch(self.SERIALIZED_DAG_STATS) as mock_stats:
+            assert SDM.write_dag(LazyDeserializedDAG.from_dag(dag), 
bundle_name="dag_maker") is True
+
+        assert session.scalar(select(func.count()).select_from(DagVersion)) == 
2
+        mock_stats.incr.assert_called_once_with(
+            "dag.serialization",
+            tags={"dag_id": "metric_dag", "bundle_name": "dag_maker"},
+        )
+
+    
@mock.patch("airflow._shared.observability.metrics.stats._export_legacy_names", 
True)
+    @mock.patch("airflow._shared.observability.metrics.stats._get_backend")
+    def test_serialization_metric_exports_new_and_legacy_names(self, 
mock_get_backend, testing_dag_bundle):
+        """Serializing a DAG emits both the modern ``dag.serialization`` 
metric and its legacy name."""
+        mock_backend = mock.MagicMock(spec=StatsLogger)
+        mock_get_backend.return_value = mock_backend
+        dag = 
make_example_dags(example_dags_module).get("example_params_trigger_ui")
+
+        assert SDM.write_dag(LazyDeserializedDAG.from_dag(dag), 
bundle_name="testing") is True

Review Comment:
   Nit: the `bundle_name` is also used at least twice for each test and it 
could be a common variable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to