xBis7 commented on code in PR #68906:
URL: https://github.com/apache/airflow/pull/68906#discussion_r3466417764
##########
airflow-core/tests/unit/models/test_serialized_dag.py:
##########
@@ -201,6 +204,76 @@ def test_serialized_dag_is_updated_if_dag_is_changed(self,
testing_dag_bundle):
assert s_dag_2.data["dag"]["tags"] == ["example", "new_tag", "params"]
assert dag_updated is True
+ def test_serialization_metric_incremented_on_new_write(self,
testing_dag_bundle):
+ """A brand new serialized DAG write emits the ``dag.serialization``
metric."""
+ dag =
make_example_dags(example_dags_module).get("example_params_trigger_ui")
+ with mock.patch(self.SERIALIZED_DAG_STATS) as mock_stats:
+ assert SDM.write_dag(LazyDeserializedDAG.from_dag(dag),
bundle_name="testing") is True
+
+ mock_stats.incr.assert_called_once_with(
+ "dag.serialization",
+ tags={"dag_id": dag.dag_id, "bundle_name": "testing"},
+ )
+
+ def test_serialization_metric_not_incremented_when_unchanged(self,
testing_dag_bundle):
+ """Re-writing an unchanged DAG must not emit the ``dag.serialization``
metric."""
+ dag =
make_example_dags(example_dags_module).get("example_params_trigger_ui")
+ assert SDM.write_dag(LazyDeserializedDAG.from_dag(dag),
bundle_name="testing") is True
+
+ with mock.patch(self.SERIALIZED_DAG_STATS) as mock_stats:
+ assert SDM.write_dag(LazyDeserializedDAG.from_dag(dag),
bundle_name="testing") is False
+
+ mock_stats.incr.assert_not_called()
+
+ def test_serialization_metric_incremented_on_inplace_update(self,
dag_maker, session):
+ """Updating a DAG version in place (no dag runs) emits the metric
once."""
+ with dag_maker("metric_dag") as dag:
+ PythonOperator(task_id="task1", python_callable=lambda: None)
+ # Change the DAG so the hash differs; with no dag runs this updates in
place.
+ PythonOperator(task_id="task2", python_callable=lambda: None, dag=dag)
+
+ with mock.patch(self.SERIALIZED_DAG_STATS) as mock_stats:
+ assert SDM.write_dag(LazyDeserializedDAG.from_dag(dag),
bundle_name="dag_maker") is True
Review Comment:
Half of the tests are setting `bundle_name` as `testing` and the other half
as `dag_maker`. This isn't an issue but it's a bit confusing because someone
might think that the context changes, but I don't think that's the case.
##########
airflow-core/tests/unit/models/test_serialized_dag.py:
##########
@@ -201,6 +204,76 @@ def test_serialized_dag_is_updated_if_dag_is_changed(self,
testing_dag_bundle):
assert s_dag_2.data["dag"]["tags"] == ["example", "new_tag", "params"]
assert dag_updated is True
+ def test_serialization_metric_incremented_on_new_write(self,
testing_dag_bundle):
+ """A brand new serialized DAG write emits the ``dag.serialization``
metric."""
+ dag =
make_example_dags(example_dags_module).get("example_params_trigger_ui")
+ with mock.patch(self.SERIALIZED_DAG_STATS) as mock_stats:
+ assert SDM.write_dag(LazyDeserializedDAG.from_dag(dag),
bundle_name="testing") is True
+
+ mock_stats.incr.assert_called_once_with(
+ "dag.serialization",
+ tags={"dag_id": dag.dag_id, "bundle_name": "testing"},
+ )
+
+ def test_serialization_metric_not_incremented_when_unchanged(self,
testing_dag_bundle):
+ """Re-writing an unchanged DAG must not emit the ``dag.serialization``
metric."""
+ dag =
make_example_dags(example_dags_module).get("example_params_trigger_ui")
+ assert SDM.write_dag(LazyDeserializedDAG.from_dag(dag),
bundle_name="testing") is True
+
+ with mock.patch(self.SERIALIZED_DAG_STATS) as mock_stats:
+ assert SDM.write_dag(LazyDeserializedDAG.from_dag(dag),
bundle_name="testing") is False
+
+ mock_stats.incr.assert_not_called()
+
+ def test_serialization_metric_incremented_on_inplace_update(self,
dag_maker, session):
+ """Updating a DAG version in place (no dag runs) emits the metric
once."""
+ with dag_maker("metric_dag") as dag:
+ PythonOperator(task_id="task1", python_callable=lambda: None)
+ # Change the DAG so the hash differs; with no dag runs this updates in
place.
+ PythonOperator(task_id="task2", python_callable=lambda: None, dag=dag)
+
+ with mock.patch(self.SERIALIZED_DAG_STATS) as mock_stats:
+ assert SDM.write_dag(LazyDeserializedDAG.from_dag(dag),
bundle_name="dag_maker") is True
+
+ assert session.scalar(select(func.count()).select_from(DagVersion)) ==
1
+ mock_stats.incr.assert_called_once_with(
+ "dag.serialization",
+ tags={"dag_id": "metric_dag", "bundle_name": "dag_maker"},
+ )
+
+ def test_serialization_metric_incremented_on_new_version(self, dag_maker,
session):
+ """Writing a new DAG version (existing run) emits the metric once."""
+ with dag_maker("metric_dag") as dag:
+ PythonOperator(task_id="task1", python_callable=lambda: None)
+ dag_maker.create_dagrun(run_id="run1",
logical_date=pendulum.datetime(2025, 1, 1))
+ PythonOperator(task_id="task2", python_callable=lambda: None, dag=dag)
+
+ with mock.patch(self.SERIALIZED_DAG_STATS) as mock_stats:
+ assert SDM.write_dag(LazyDeserializedDAG.from_dag(dag),
bundle_name="dag_maker") is True
+
+ assert session.scalar(select(func.count()).select_from(DagVersion)) ==
2
+ mock_stats.incr.assert_called_once_with(
+ "dag.serialization",
+ tags={"dag_id": "metric_dag", "bundle_name": "dag_maker"},
+ )
+
+
@mock.patch("airflow._shared.observability.metrics.stats._export_legacy_names",
True)
+ @mock.patch("airflow._shared.observability.metrics.stats._get_backend")
+ def test_serialization_metric_exports_new_and_legacy_names(self,
mock_get_backend, testing_dag_bundle):
+ """Serializing a DAG emits both the modern ``dag.serialization``
metric and its legacy name."""
+ mock_backend = mock.MagicMock(spec=StatsLogger)
+ mock_get_backend.return_value = mock_backend
+ dag =
make_example_dags(example_dags_module).get("example_params_trigger_ui")
+
+ assert SDM.write_dag(LazyDeserializedDAG.from_dag(dag),
bundle_name="testing") is True
Review Comment:
Nit: the `bundle_name` is also used at least twice for each test and it
could be a common variable.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]