This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8a302d3fc1 Fix write processor_subdir in serialized_dag table (#35661)
8a302d3fc1 is described below

commit 8a302d3fc1c912b3d5f8c03d8124f58ff5a97e9d
Author: Tornike Gurgenidze <tokok...@gmail.com>
AuthorDate: Mon Nov 20 15:20:45 2023 +0400

    Fix write processor_subdir in serialized_dag table (#35661)
    
    * Fix write processor_subdir in serialized_dag table
    
    * Fix formatting and failing test for dagbag
    
    ---------
    
    Co-authored-by: Tornike Gurgenize <togurgeni...@bog.ge>
---
 airflow/models/dagbag.py    | 7 +++++--
 tests/models/test_dagbag.py | 4 +++-
 2 files changed, 8 insertions(+), 3 deletions(-)

diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py
index 2f709eb11a..ca81af8105 100644
--- a/airflow/models/dagbag.py
+++ b/airflow/models/dagbag.py
@@ -622,7 +622,7 @@ class DagBag(LoggingMixin):
 
         log = cls.logger()
 
-        def _serialize_dag_capturing_errors(dag, session):
+        def _serialize_dag_capturing_errors(dag, session, processor_subdir):
             """
             Try to serialize the dag to the DB, but make a note of any errors.
 
@@ -636,6 +636,7 @@ class DagBag(LoggingMixin):
                     dag,
                     
min_update_interval=settings.MIN_SERIALIZED_DAG_UPDATE_INTERVAL,
                     session=session,
+                    processor_subdir=processor_subdir,
                 )
                 if dag_was_updated:
                     DagBag._sync_perm_for_dag(dag, session=session)
@@ -665,7 +666,9 @@ class DagBag(LoggingMixin):
                 try:
                     # Write Serialized DAGs to DB, capturing errors
                     for dag in dags.values():
-                        
serialize_errors.extend(_serialize_dag_capturing_errors(dag, session))
+                        serialize_errors.extend(
+                            _serialize_dag_capturing_errors(dag, session, 
processor_subdir)
+                        )
 
                     DAG.bulk_write_to_db(dags.values(), 
processor_subdir=processor_subdir, session=session)
                 except OperationalError:
diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py
index 0004fb608f..2678d005c9 100644
--- a/tests/models/test_dagbag.py
+++ b/tests/models/test_dagbag.py
@@ -888,7 +888,9 @@ class TestDagBag:
         # and the session was roll-backed before even reaching 
'SerializedDagModel.write_dag'
         mock_s10n_write_dag.assert_has_calls(
             [
-                mock.call(mock_dag, min_update_interval=mock.ANY, 
session=mock_session),
+                mock.call(
+                    mock_dag, min_update_interval=mock.ANY, 
processor_subdir=None, session=mock_session
+                ),
             ]
         )
 

Reply via email to