This is an automated email from the ASF dual-hosted git repository. phanikumv pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 5f07a596e0f Run migration test from AF2.10.5 to 3.0 (#48512) 5f07a596e0f is described below commit 5f07a596e0ffad8030539677bb7365f0e6011f24 Author: Ephraim Anierobi <splendidzig...@gmail.com> AuthorDate: Tue Apr 8 12:04:08 2025 +0100 Run migration test from AF2.10.5 to 3.0 (#48512) * Run migration test from AF2.10.5 to 3.0 Improve the testing framework to validate database migrations from Airflow 2.10.5 to 3.0, with verification in CI. This ensures smooth upgrade paths for users. Key fixes/Improvement: - Fix foreign key inconsistencies in the dataset-to-asset migration - Add support for different constraint naming patterns between ORM and migrations observed in Airflow 2 - Handle database-specific edge cases for PostgreSQL and MySQL This ensures databases can be successfully migrated regardless of whether they were initially created through ORM or migration files. closes: #45479 --- .github/actions/migration_tests/action.yml | 47 +++++++-- airflow-core/docs/img/airflow_erd.sha256 | 2 +- .../versions/0041_3_0_0_rename_dataset_as_asset.py | 109 ++++++++++++++++++--- .../versions/0047_3_0_0_add_dag_versioning.py | 13 +++ 4 files changed, 149 insertions(+), 22 deletions(-) diff --git a/.github/actions/migration_tests/action.yml b/.github/actions/migration_tests/action.yml index 32f77bea198..ed71e21407d 100644 --- a/.github/actions/migration_tests/action.yml +++ b/.github/actions/migration_tests/action.yml @@ -21,29 +21,56 @@ description: 'Runs migration tests' runs: using: "composite" steps: - - name: "Test downgrade migration file ${{env.BACKEND}}" + - name: "Test migration file 2 to 3 migration: ${{env.BACKEND}}" shell: bash - run: > - breeze shell "airflow db reset --skip-init -y && - airflow db migrate --to-revision heads && - airflow db downgrade -r 937cbd173ca1 -y && - airflow db migrate" + run: | + breeze shell "${{ env.AIRFLOW_2_CMD }}" --use-airflow-version 2.10.5 --answer y && + breeze shell "${{ env.AIRFLOW_3_CMD }}" --no-db-cleanup env: COMPOSE_PROJECT_NAME: "docker-compose" AIRFLOW__DATABASE__EXTERNAL_DB_MANAGERS: "airflow.providers.fab.auth_manager.models.db.FABDBManager" + DB_RESET: "false" + AIRFLOW_2_CMD: >- + airflow db reset --skip-init -y && + airflow db migrate --to-revision heads + AIRFLOW_3_CMD: >- + airflow db migrate --to-revision heads && + airflow db downgrade -n 2.7.0 -y && + airflow db migrate + if: env.BACKEND != 'sqlite' - name: "Bring composer down" shell: bash run: breeze down env: COMPOSE_PROJECT_NAME: "docker-compose" - - name: "Test downgrade ORM ${{env.BACKEND}}" + - name: "Test ORM migration 2 to 3: ${{env.BACKEND}}" + shell: bash + run: > + breeze shell "${{ env.AIRFLOW_2_CMD }}" --use-airflow-version 2.10.5 --answer y && + breeze shell "${{ env.AIRFLOW_3_CMD }}" --no-db-cleanup + env: + COMPOSE_PROJECT_NAME: "docker-compose" + AIRFLOW__DATABASE__EXTERNAL_DB_MANAGERS: "airflow.providers.fab.auth_manager.models.db.FABDBManager" + DB_RESET: "false" + AIRFLOW_2_CMD: >- + airflow db reset -y + AIRFLOW_3_CMD: >- + airflow db migrate --to-revision heads && + airflow db downgrade -n 2.7.0 -y && + airflow db migrate + if: env.BACKEND != 'sqlite' + - name: "Bring compose down again" + shell: bash + run: breeze down + env: + COMPOSE_PROJECT_NAME: "docker-compose" + - name: "Test ORM migration ${{env.BACKEND}}" shell: bash run: > breeze shell "airflow db reset -y && - airflow db migrate && - airflow db downgrade -r 22ed7efa9da2 -y && + airflow db migrate --to-revision heads && + airflow db downgrade -n 2.7.0 -y && airflow db migrate" - COMPOSE_PROJECT_NAME: "docker-compose" env: COMPOSE_PROJECT_NAME: "docker-compose" AIRFLOW__DATABASE__EXTERNAL_DB_MANAGERS: "airflow.providers.fab.auth_manager.models.db.FABDBManager" diff --git a/airflow-core/docs/img/airflow_erd.sha256 b/airflow-core/docs/img/airflow_erd.sha256 index 3d2e9716ff6..b53496eee22 100644 --- a/airflow-core/docs/img/airflow_erd.sha256 +++ b/airflow-core/docs/img/airflow_erd.sha256 @@ -1 +1 @@ -aa423e4d2c250ceca76fd28b7fd788fc476f7e235326c187b6804916853e6ebf \ No newline at end of file +11cbcf4e71543241568358521f85d6b04d7aa51c862986404092ff05fc31bda4 \ No newline at end of file diff --git a/airflow-core/src/airflow/migrations/versions/0041_3_0_0_rename_dataset_as_asset.py b/airflow-core/src/airflow/migrations/versions/0041_3_0_0_rename_dataset_as_asset.py index 6c7b20f3816..4604f0f149c 100644 --- a/airflow-core/src/airflow/migrations/versions/0041_3_0_0_rename_dataset_as_asset.py +++ b/airflow-core/src/airflow/migrations/versions/0041_3_0_0_rename_dataset_as_asset.py @@ -32,6 +32,7 @@ import sqlalchemy as sa import sqlalchemy_jsonfield from alembic import op +from airflow.migrations.utils import mysql_drop_foreignkey_if_exists from airflow.settings import json # revision identifiers, used by Alembic. @@ -73,6 +74,24 @@ def _rename_fk_constraint( ) +def _rename_pk_constraint_unkown( + *, + batch_op: BatchOperations, + table_name: str, + original_name: str, + alternative_name: str, + new_name: str, + columns: list[str], +) -> None: + dialect = op.get_bind().dialect.name + if dialect == "postgresql": + op.execute(f"ALTER TABLE {table_name} DROP CONSTRAINT IF EXISTS {original_name}") + op.execute(f"ALTER TABLE {table_name} DROP CONSTRAINT IF EXISTS {alternative_name}") + elif dialect == "mysql": + op.execute(f"ALTER TABLE {table_name} DROP PRIMARY KEY") + batch_op.create_primary_key(constraint_name=new_name, columns=columns) + + def _rename_pk_constraint( *, batch_op: BatchOperations, original_name: str, new_name: str, columns: list[str] ) -> None: @@ -98,6 +117,7 @@ table_name_mappings = ( def upgrade(): """Rename dataset as asset.""" + dialect = op.get_bind().dialect.name # Rename tables for original_name, new_name in table_name_mappings: op.rename_table(original_name, new_name) @@ -125,6 +145,14 @@ def upgrade(): with op.batch_alter_table("asset_alias_asset", schema=None) as batch_op: batch_op.alter_column("dataset_id", new_column_name="asset_id", type_=sa.Integer(), nullable=False) + _rename_pk_constraint_unkown( + batch_op=batch_op, + table_name="asset_alias_asset", + original_name="dataset_alias_dataset_pkey", + alternative_name="asset_alias_asset_pkey", + new_name="asset_alias_asset_pkey", + columns=["alias_id", "asset_id"], + ) with op.batch_alter_table("asset_alias_asset", schema=None) as batch_op: batch_op.drop_constraint(op.f("dataset_alias_dataset_alias_id_fkey"), type_="foreignkey") @@ -135,6 +163,7 @@ def upgrade(): columns=["alias_id"], unique=False, ) + batch_op.create_foreign_key( constraint_name="asset_alias_asset_alias_id_fkey", referent_table="asset_alias", @@ -144,6 +173,12 @@ def upgrade(): ) batch_op.drop_constraint(op.f("dataset_alias_dataset_dataset_id_fkey"), type_="foreignkey") + if dialect == "postgresql": + op.execute("ALTER TABLE asset_alias_asset DROP CONSTRAINT IF EXISTS ds_dsa_alias_id") + op.execute("ALTER TABLE asset_alias_asset DROP CONSTRAINT IF EXISTS ds_dsa_dataset_id") + elif dialect == "mysql": + mysql_drop_foreignkey_if_exists("ds_dsa_alias_id", "asset_alias_asset", op) + mysql_drop_foreignkey_if_exists("ds_dsa_dataset_id", "asset_alias_asset", op) _rename_index( batch_op=batch_op, original_name="idx_dataset_alias_dataset_alias_dataset_id", @@ -151,6 +186,7 @@ def upgrade(): columns=["asset_id"], unique=False, ) + batch_op.create_foreign_key( constraint_name="asset_alias_asset_asset_id_fkey", referent_table="asset", @@ -177,6 +213,13 @@ def upgrade(): ) batch_op.drop_constraint(op.f("dataset_alias_dataset_event_event_id_fkey"), type_="foreignkey") + if dialect == "postgresql": + op.execute("ALTER TABLE asset_alias_asset_event DROP CONSTRAINT IF EXISTS dss_de_alias_id") + op.execute("ALTER TABLE asset_alias_asset_event DROP CONSTRAINT IF EXISTS dss_de_event_id") + elif dialect == "mysql": + mysql_drop_foreignkey_if_exists("dss_de_alias_id", "asset_alias_asset_event", op) + mysql_drop_foreignkey_if_exists("dss_de_event_id", "asset_alias_asset_event", op) + _rename_index( batch_op=batch_op, original_name="idx_dataset_alias_dataset_event_event_id", @@ -233,10 +276,12 @@ def upgrade(): batch_op.drop_constraint("dsdr_dag_id_fkey", type_="foreignkey") if op.get_bind().dialect.name in ("postgresql", "mysql"): batch_op.drop_constraint("dsdr_dataset_fkey", type_="foreignkey") - - _rename_pk_constraint( + with op.batch_alter_table("dag_schedule_asset_reference", schema=None) as batch_op: + _rename_pk_constraint_unkown( batch_op=batch_op, - original_name="dsdr_pkey", + table_name="dag_schedule_asset_reference", + original_name="dag_schedule_dataset_reference_pkey", + alternative_name="dsdr_pkey", new_name="dsar_pkey", columns=["asset_id", "dag_id"], ) @@ -269,10 +314,12 @@ def upgrade(): batch_op.drop_constraint("todr_dag_id_fkey", type_="foreignkey") if op.get_bind().dialect.name in ("postgresql", "mysql"): batch_op.drop_constraint("todr_dataset_fkey", type_="foreignkey") - - _rename_pk_constraint( + with op.batch_alter_table("task_outlet_asset_reference", schema=None) as batch_op: + _rename_pk_constraint_unkown( batch_op=batch_op, - original_name="todr_pkey", + table_name="task_outlet_asset_reference", + original_name="task_outlet_dataset_reference_pkey", + alternative_name="todr_pkey", new_name="toar_pkey", columns=["asset_id", "dag_id", "task_id"], ) @@ -301,9 +348,12 @@ def upgrade(): if op.get_bind().dialect.name in ("postgresql", "mysql"): batch_op.drop_constraint("ddrq_dataset_fkey", type_="foreignkey") - _rename_pk_constraint( + with op.batch_alter_table("asset_dag_run_queue", schema=None) as batch_op: + _rename_pk_constraint_unkown( batch_op=batch_op, - original_name="datasetdagrunqueue_pkey", + table_name="asset_dag_run_queue", + original_name="dataset_dag_run_queue_pkey", + alternative_name="datasetdagrunqueue_pkey", new_name="assetdagrunqueue_pkey", columns=["asset_id", "target_dag_id"], ) @@ -325,7 +375,19 @@ def upgrade(): ) with op.batch_alter_table("dagrun_asset_event", schema=None) as batch_op: - batch_op.drop_constraint("dagrun_dataset_event_event_id_fkey", type_="foreignkey") + if dialect == "postgresql": + op.execute( + "ALTER TABLE dagrun_asset_event DROP CONSTRAINT IF EXISTS dagrun_dataset_events_event_id_fkey" + ) + op.execute( + "ALTER TABLE dagrun_asset_event DROP CONSTRAINT IF EXISTS dagrun_dataset_event_event_id_fkey" + ) + elif dialect == "mysql": + mysql_drop_foreignkey_if_exists("dagrun_dataset_events_event_id_fkey", "dagrun_asset_event", op) + mysql_drop_foreignkey_if_exists("dagrun_dataset_event_event_id_fkey", "dagrun_asset_event", op) + else: + # sqlite: Assuming no upgrade for sqlite from Airflow 2 + batch_op.drop_constraint("dagrun_dataset_event_event_id_fkey", type_="foreignkey") _rename_index( batch_op=batch_op, original_name="idx_dagrun_dataset_events_dag_run_id", @@ -340,8 +402,19 @@ def upgrade(): remote_cols=["id"], ondelete="CASCADE", ) - - batch_op.drop_constraint("dagrun_dataset_event_dag_run_id_fkey", type_="foreignkey") + if dialect == "postgresql": + op.execute( + "ALTER TABLE dagrun_asset_event DROP CONSTRAINT IF EXISTS dagrun_dataset_events_dag_run_id_fkey" + ) + op.execute( + "ALTER TABLE dagrun_asset_event DROP CONSTRAINT IF EXISTS dagrun_dataset_event_dag_run_id_fkey" + ) + elif dialect == "mysql": + mysql_drop_foreignkey_if_exists("dagrun_dataset_events_dag_run_id_fkey", "dagrun_asset_event", op) + mysql_drop_foreignkey_if_exists("dagrun_dataset_event_dag_run_id_fkey", "dagrun_asset_event", op) + else: + # sqlite: Assuming no upgrade for sqlite from Airflow 2 + batch_op.drop_constraint("dagrun_dataset_event_dag_run_id_fkey", type_="foreignkey") _rename_index( batch_op=batch_op, original_name="idx_dagrun_dataset_events_event_id", @@ -356,6 +429,14 @@ def upgrade(): remote_cols=["id"], ondelete="CASCADE", ) + _rename_pk_constraint_unkown( + batch_op=batch_op, + table_name="dagrun_asset_event", + original_name="dagrun_dataset_events_pkey", + alternative_name="dagrun_dataset_event_pkey", + new_name="dagrun_asset_event_pkey", + columns=["event_id", "dag_run_id"], + ) with op.batch_alter_table("asset_event", schema=None) as batch_op: batch_op.alter_column("dataset_id", new_column_name="asset_id", type_=sa.Integer(), nullable=False) @@ -651,6 +732,12 @@ def downgrade(): remote_cols=["id"], ondelete="CASCADE", ) + _rename_pk_constraint( + batch_op=batch_op, + original_name="dagrun_asset_event_pkey", + new_name="dagrun_dataset_event_pkey", + columns=["event_id", "dag_run_id"], + ) with op.batch_alter_table("dataset_event", schema=None) as batch_op: batch_op.alter_column("asset_id", new_column_name="dataset_id", type_=sa.Integer(), nullable=False) diff --git a/airflow-core/src/airflow/migrations/versions/0047_3_0_0_add_dag_versioning.py b/airflow-core/src/airflow/migrations/versions/0047_3_0_0_add_dag_versioning.py index 771441bbb83..ac76e3bbd17 100644 --- a/airflow-core/src/airflow/migrations/versions/0047_3_0_0_add_dag_versioning.py +++ b/airflow-core/src/airflow/migrations/versions/0047_3_0_0_add_dag_versioning.py @@ -110,8 +110,17 @@ def upgrade(): for row in rows: id = uuid7() + if conn.dialect.name != "postgresql": + id = id.hex + else: + id = str(id) + conn.execute(stmt.bindparams(_id=id, dag_id=row.dag_id)) id2 = uuid7() + if conn.dialect.name != "postgresql": + id2 = id2.hex + else: + id2 = str(id2) # Update dagversion table conn.execute( sa.text(""" @@ -209,6 +218,10 @@ def upgrade(): """) for row in rows: id = uuid7() + if conn.dialect.name != "postgresql": + id = id.hex + else: + id = str(id) try: source_code = DagCode.get_code_from_file(row.fileloc) except FileNotFoundError: