This is an automated email from the ASF dual-hosted git repository.

vatsrahul1001 pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-2-test by this push:
     new a5faed920af fix dag version inflation caused by unmatched serialized 
result of task using reserialized command (#61077) (#66861)
a5faed920af is described below

commit a5faed920af8917710bcc15d6797188593450bb7
Author: Rahul Vats <[email protected]>
AuthorDate: Fri May 15 12:26:50 2026 +0530

    fix dag version inflation caused by unmatched serialized result of task 
using reserialized command (#61077) (#66861)
    
    Closes: #60868
    (cherry picked from commit c4252ba97b2b00bd7e92f0ad4a8dcfdd336ed849)
    
    Co-authored-by: Jeongwoo Do <[email protected]>
---
 airflow-core/src/airflow/models/serialized_dag.py  |  2 +-
 .../tests/unit/cli/commands/test_dag_command.py    | 30 ++++++++++++++++++
 .../tests/unit/dags/test_dag_reserialize.py        | 36 ++++++++++++++++++++++
 3 files changed, 67 insertions(+), 1 deletion(-)

diff --git a/airflow-core/src/airflow/models/serialized_dag.py 
b/airflow-core/src/airflow/models/serialized_dag.py
index 8323c2228ff..be55c3e0908 100644
--- a/airflow-core/src/airflow/models/serialized_dag.py
+++ b/airflow-core/src/airflow/models/serialized_dag.py
@@ -383,7 +383,7 @@ class SerializedDagModel(Base):
         """Recursively sort json_dict and its nested dictionaries and lists."""
         if isinstance(serialized_dag, dict):
             return {k: cls._sort_serialized_dag_dict(v) for k, v in 
sorted(serialized_dag.items())}
-        if isinstance(serialized_dag, list):
+        if isinstance(serialized_dag, (list, tuple)):
             if all(isinstance(i, dict) for i in serialized_dag):
                 if all(
                     isinstance(i.get("__var", {}), Iterable) and "task_id" in 
i.get("__var", {})
diff --git a/airflow-core/tests/unit/cli/commands/test_dag_command.py 
b/airflow-core/tests/unit/cli/commands/test_dag_command.py
index 208fd9bb074..cb59144f107 100644
--- a/airflow-core/tests/unit/cli/commands/test_dag_command.py
+++ b/airflow-core/tests/unit/cli/commands/test_dag_command.py
@@ -25,6 +25,7 @@ from datetime import datetime, timedelta
 from unittest import mock
 from unittest.mock import MagicMock
 
+import msgspec
 import pendulum
 import pytest
 import time_machine
@@ -35,6 +36,7 @@ from airflow._shared.timezones import timezone
 from airflow.cli import cli_parser
 from airflow.cli.commands import dag_command
 from airflow.dag_processing.dagbag import DagBag, sync_bag_to_db
+from airflow.dag_processing.processor import DagFileParsingResult, 
DagFileProcessorProcess
 from airflow.exceptions import AirflowException
 from airflow.models import DagModel, DagRun
 from airflow.models.dagbag import DBDagBag
@@ -42,6 +44,8 @@ from airflow.models.serialized_dag import SerializedDagModel
 from airflow.providers.standard.triggers.temporal import DateTimeTrigger, 
TimeDeltaTrigger
 from airflow.sdk import BaseOperator, task
 from airflow.sdk.definitions.dag import _run_inline_trigger
+from airflow.sdk.execution_time.comms import _RequestFrame, _ResponseFrame
+from airflow.serialization.serialized_objects import DagSerialization, 
LazyDeserializedDAG
 from airflow.triggers.base import TriggerEvent
 from airflow.utils.session import create_session
 from airflow.utils.state import DagRunState
@@ -1070,3 +1074,29 @@ class TestCliDagsReserialize:
 
         serialized_dag_ids = 
set(session.execute(select(SerializedDagModel.dag_id)).scalars())
         assert serialized_dag_ids == {"test_example_bash_operator", 
"test_sensor"}
+
+    @conf_vars({("core", "load_examples"): "false"})
+    def test_reserialize_should_make_equal_hash_with_dag_processor(self, 
configure_dag_bundles, session):
+        bundles = {"bundle_reserialize": TEST_DAGS_FOLDER / 
"test_dag_reserialize.py"}
+        with configure_dag_bundles(bundles):
+            dag_command.dag_reserialize(
+                self.parser.parse_args(["dags", "reserialize", 
"--bundle-name", "bundle_reserialize"])
+            )
+
+        dagbag = DagBag(bundles["bundle_reserialize"], 
bundle_path=bundles["bundle_reserialize"])
+        dag_parsing_result = DagFileParsingResult(
+            fileloc=bundles["bundle_reserialize"].name,
+            serialized_dags=[
+                LazyDeserializedDAG(data=DagSerialization.to_dict(dag)) for 
dag in dagbag.dags.values()
+            ],
+        )
+
+        frame = _ResponseFrame(id=0, 
body=dag_parsing_result.model_dump()).as_bytes()
+        request_frame = 
msgspec.msgpack.Decoder[_RequestFrame](_RequestFrame).decode(frame[4:])
+        dag_processor_parsing_result = 
DagFileProcessorProcess.decoder.validate_python(request_frame.body)
+
+        serialized_dag_hash = 
list(session.execute(select(SerializedDagModel.dag_hash)).scalars())
+
+        assert len(dag_processor_parsing_result.serialized_dags) == 1
+        assert len(serialized_dag_hash) == 1
+        assert dag_processor_parsing_result.serialized_dags[0].hash == 
serialized_dag_hash[0]
diff --git a/airflow-core/tests/unit/dags/test_dag_reserialize.py 
b/airflow-core/tests/unit/dags/test_dag_reserialize.py
new file mode 100644
index 00000000000..c9eba5ca3e1
--- /dev/null
+++ b/airflow-core/tests/unit/dags/test_dag_reserialize.py
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from datetime import datetime
+
+from airflow.providers.standard.operators.python import PythonOperator
+from airflow.sdk import DAG
+
+
+def empty_task():
+    pass
+
+
+with DAG(
+    "test_dag_reserialize",
+    start_date=datetime(2026, 1, 20),
+    schedule="* * * * *",
+    catchup=False,
+    max_active_runs=1,
+) as dag:
+    task_b = PythonOperator(task_id="bear", python_callable=empty_task)

Reply via email to