This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v3-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit f5951e2f24f39ae0c6877dd978e01151e7247393 Author: Kaxil Naik <[email protected]> AuthorDate: Tue Sep 23 12:47:44 2025 +0100 Handle Serialized DAG Format from v3 to v2 when downgrading Airflow (#55975) This migration enables Airflow downgrades by converting v3 serialized DAGs back to v2 format. The `upgrade()` is a no-op since the server handles v1/v2/v3 at runtime, but `downgrade()` removes client_defaults sections and updates version numbers to ensure compatibility with older Airflow versions. closes https://github.com/apache/airflow/issues/55949 (cherry picked from commit 32995675d03f3d5176495829d9061e0a47811eec) --- airflow-core/docs/img/airflow_erd.sha256 | 2 +- airflow-core/docs/migrations-ref.rst | 5 +- ...3_1_0_downgrade_serialized_dag_version_to_v2.py | 199 +++++++++++++++++++++ airflow-core/src/airflow/utils/db.py | 2 +- 4 files changed, 205 insertions(+), 3 deletions(-) diff --git a/airflow-core/docs/img/airflow_erd.sha256 b/airflow-core/docs/img/airflow_erd.sha256 index 9274a23fed6..6162bcff312 100644 --- a/airflow-core/docs/img/airflow_erd.sha256 +++ b/airflow-core/docs/img/airflow_erd.sha256 @@ -1 +1 @@ -b4de080500173a0c42be078718a969867ae00a5ffd4c68e18d7bacb7e5bb7fd1 \ No newline at end of file +db7901e22801d299714b84b9a676081b12bc6247de807cb2469f3c59bdabcfad \ No newline at end of file diff --git a/airflow-core/docs/migrations-ref.rst b/airflow-core/docs/migrations-ref.rst index 1e0c886097e..c8569e667be 100644 --- a/airflow-core/docs/migrations-ref.rst +++ b/airflow-core/docs/migrations-ref.rst @@ -39,7 +39,10 @@ Here's the list of all the Database Migrations that are executed via when you ru +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | Revision ID | Revises ID | Airflow Version | Description | +=========================+==================+===================+==============================================================+ -| ``eaf332f43c7c`` (head) | ``a3c7f2b18d4e`` | ``3.1.0`` | add last_parse_duration to dag model. | +| ``cc92b33c6709`` (head) | ``eaf332f43c7c`` | ``3.1.0`` | Add backward compatibility for serialized DAG format v3 to | +| | | | v2. | ++-------------------------+------------------+-------------------+--------------------------------------------------------------+ +| ``eaf332f43c7c`` | ``a3c7f2b18d4e`` | ``3.1.0`` | add last_parse_duration to dag model. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | ``a3c7f2b18d4e`` | ``7582ea3f3dd5`` | ``3.1.0`` | Add tables to store teams and associations with dag bundles. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ diff --git a/airflow-core/src/airflow/migrations/versions/0085_3_1_0_downgrade_serialized_dag_version_to_v2.py b/airflow-core/src/airflow/migrations/versions/0085_3_1_0_downgrade_serialized_dag_version_to_v2.py new file mode 100644 index 00000000000..3a5045dda50 --- /dev/null +++ b/airflow-core/src/airflow/migrations/versions/0085_3_1_0_downgrade_serialized_dag_version_to_v2.py @@ -0,0 +1,199 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Add backward compatibility for serialized DAG format v3 to v2. + +Revision ID: cc92b33c6709 +Revises: eaf332f43c7c +Create Date: 2025-09-22 22:50:48.035121 + +""" + +from __future__ import annotations + +from textwrap import dedent + +import sqlalchemy as sa +from alembic import context, op + +# revision identifiers, used by Alembic. +revision = "cc92b33c6709" +down_revision = "eaf332f43c7c" +branch_labels = None +depends_on = None +airflow_version = "3.1.0" + + +def upgrade(): + """Apply Downgrade Serialized Dag version to v2.""" + # No-op: Server handles v1/v2/v3 DAGs at runtime via conversion functions + pass + + +def downgrade(): + """Convert v3 serialized DAGs back to v2 format for compatibility with older Airflow versions.""" + if context.is_offline_mode(): + print( + dedent(""" + ------------ + -- Manual v3 to v2 DAG conversion required (offline mode) + -- + -- PostgreSQL: + -- UPDATE serialized_dag SET data = jsonb_set((data::jsonb - 'client_defaults'), '{__version}', '2')::json + -- WHERE id IN (SELECT id FROM serialized_dag WHERE data->>'__version' = '3' AND data_compressed IS NULL); + -- + -- MySQL/SQLite: + -- UPDATE serialized_dag SET data = JSON_SET(JSON_REMOVE(data, '$.client_defaults'), '$.__version', 2) + -- WHERE JSON_EXTRACT(data, '$.__version') = '3' AND data_compressed IS NULL; + -- + -- For compressed DAGs: run online migration. + ------------ + """) + ) + return + + import gzip + import json + + connection = op.get_bind() + dialect = connection.dialect.name + + if dialect == "postgresql": + # PostgreSQL - pre-filter v3 DAGs to avoid parsing all rows + connection.execute( + sa.text(""" + UPDATE serialized_dag + SET data = jsonb_set( + (data::jsonb - 'client_defaults'), + '{__version}', + '2' + )::json + WHERE id IN ( + SELECT id FROM serialized_dag + WHERE data->>'__version' = '3' + AND data_compressed IS NULL + ) + """) + ) + elif dialect == "mysql": + connection.execute( + sa.text(""" + UPDATE serialized_dag + SET data = JSON_SET( + JSON_REMOVE(data, '$.client_defaults'), + '$.__version', + 2 + ) + WHERE JSON_EXTRACT(data, '$.__version') = '3' + AND data_compressed IS NULL + """) + ) + else: + json_functions_available = False + try: + connection.execute(sa.text("SELECT JSON_SET('{}', '$.test', 'value')")).fetchone() + json_functions_available = True + print("SQLite JSON functions detected, using optimized SQL approach") + except Exception: + print("SQLite JSON functions not available, using Python fallback for JSON processing") + + if json_functions_available: + connection.execute( + sa.text(""" + UPDATE serialized_dag + SET data = JSON_SET( + JSON_REMOVE(data, '$.client_defaults'), + '$.__version', + 2 + ) + WHERE JSON_EXTRACT(data, '$.__version') = '3' + AND data_compressed IS NULL + """) + ) + else: + result = connection.execute( + sa.text(""" + SELECT id, data + FROM serialized_dag + WHERE data_compressed IS NULL + """) + ) + + for row in result: + dag_id, data_json = row + try: + if data_json is None: + continue + + dag_data = json.loads(data_json) + + if dag_data.get("__version") != 3: + continue + + if "client_defaults" in dag_data: + del dag_data["client_defaults"] + dag_data["__version"] = 2 + + new_json = json.dumps(dag_data) + connection.execute( + sa.text("UPDATE serialized_dag SET data = :data WHERE id = :id"), + {"data": new_json, "id": dag_id}, + ) + + except Exception as e: + print(f"Failed to downgrade uncompressed DAG {dag_id}: {e}") + continue + try: + result = connection.execute( + sa.text(""" + SELECT id, data_compressed + FROM serialized_dag + WHERE data_compressed IS NOT NULL + """) + ) + + for row in result: + dag_id, compressed_data = row + try: + if compressed_data is None: + continue + + decompressed = gzip.decompress(compressed_data) + dag_data = json.loads(decompressed) + + if dag_data.get("__version") != 3: + continue + + if "client_defaults" in dag_data: + del dag_data["client_defaults"] + dag_data["__version"] = 2 + + new_compressed = gzip.compress(json.dumps(dag_data).encode("utf-8")) + connection.execute( + sa.text("UPDATE serialized_dag SET data_compressed = :data WHERE id = :id"), + {"data": new_compressed, "id": dag_id}, + ) + + except Exception as e: + print(f"Failed to downgrade compressed DAG {dag_id}: {e}") + continue + + except Exception as e: + print(f"Failed to process compressed DAGs during downgrade: {e}") + raise diff --git a/airflow-core/src/airflow/utils/db.py b/airflow-core/src/airflow/utils/db.py index 2cfb7a4eb20..11bafc01c25 100644 --- a/airflow-core/src/airflow/utils/db.py +++ b/airflow-core/src/airflow/utils/db.py @@ -110,7 +110,7 @@ _REVISION_HEADS_MAP: dict[str, str] = { "2.10.3": "5f2621c13b39", "3.0.0": "29ce7909c52b", "3.0.3": "fe199e1abd77", - "3.1.0": "eaf332f43c7c", + "3.1.0": "cc92b33c6709", }
