This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 8a302d3fc1 Fix write processor_subdir in serialized_dag table (#35661) 8a302d3fc1 is described below commit 8a302d3fc1c912b3d5f8c03d8124f58ff5a97e9d Author: Tornike Gurgenidze <tokok...@gmail.com> AuthorDate: Mon Nov 20 15:20:45 2023 +0400 Fix write processor_subdir in serialized_dag table (#35661) * Fix write processor_subdir in serialized_dag table * Fix formatting and failing test for dagbag --------- Co-authored-by: Tornike Gurgenize <togurgeni...@bog.ge> --- airflow/models/dagbag.py | 7 +++++-- tests/models/test_dagbag.py | 4 +++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index 2f709eb11a..ca81af8105 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -622,7 +622,7 @@ class DagBag(LoggingMixin): log = cls.logger() - def _serialize_dag_capturing_errors(dag, session): + def _serialize_dag_capturing_errors(dag, session, processor_subdir): """ Try to serialize the dag to the DB, but make a note of any errors. @@ -636,6 +636,7 @@ class DagBag(LoggingMixin): dag, min_update_interval=settings.MIN_SERIALIZED_DAG_UPDATE_INTERVAL, session=session, + processor_subdir=processor_subdir, ) if dag_was_updated: DagBag._sync_perm_for_dag(dag, session=session) @@ -665,7 +666,9 @@ class DagBag(LoggingMixin): try: # Write Serialized DAGs to DB, capturing errors for dag in dags.values(): - serialize_errors.extend(_serialize_dag_capturing_errors(dag, session)) + serialize_errors.extend( + _serialize_dag_capturing_errors(dag, session, processor_subdir) + ) DAG.bulk_write_to_db(dags.values(), processor_subdir=processor_subdir, session=session) except OperationalError: diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py index 0004fb608f..2678d005c9 100644 --- a/tests/models/test_dagbag.py +++ b/tests/models/test_dagbag.py @@ -888,7 +888,9 @@ class TestDagBag: # and the session was roll-backed before even reaching 'SerializedDagModel.write_dag' mock_s10n_write_dag.assert_has_calls( [ - mock.call(mock_dag, min_update_interval=mock.ANY, session=mock_session), + mock.call( + mock_dag, min_update_interval=mock.ANY, processor_subdir=None, session=mock_session + ), ] )