This is an automated email from the ASF dual-hosted git repository. jasonliu pushed a commit to branch refactor/db-migrations/replace-manual-call-with-disable_sqlite_fkeys in repository https://gitbox.apache.org/repos/asf/airflow.git
commit ddee3a7f8e82f4f4b03f9b5b56299d045de49196 Author: LIU ZHE YOU <zhu424....@gmail.com> AuthorDate: Wed Jun 4 15:48:31 2025 +0800 Refactor migration with ctx manager to diable sqlite fkey --- airflow-core/docs/img/airflow_erd.sha256 | 2 +- airflow-core/src/airflow/migrations/utils.py | 12 +- ...nconsistency_between_ORM_and_migration_files.py | 127 +++++++++++---------- 3 files changed, 71 insertions(+), 70 deletions(-) diff --git a/airflow-core/docs/img/airflow_erd.sha256 b/airflow-core/docs/img/airflow_erd.sha256 index 51c2117fd5b..525bd9e84a8 100644 --- a/airflow-core/docs/img/airflow_erd.sha256 +++ b/airflow-core/docs/img/airflow_erd.sha256 @@ -1 +1 @@ -173317aa67c36d8a257bc31c99eeedf906390cebdd9c6941d6e9c3db0515d5c5 \ No newline at end of file +aa6d364469ee98172ecd763c62c139caca2af6f50057c6db02dbcbfb742da9c7 \ No newline at end of file diff --git a/airflow-core/src/airflow/migrations/utils.py b/airflow-core/src/airflow/migrations/utils.py index 2dbbbece01a..957ca0a9490 100644 --- a/airflow-core/src/airflow/migrations/utils.py +++ b/airflow-core/src/airflow/migrations/utils.py @@ -52,13 +52,13 @@ def get_mssql_table_constraints(conn, table_name) -> dict[str, dict[str, list[st @contextmanager -def disable_sqlite_fkeys(op): - if op.get_bind().dialect.name == "sqlite": - op.execute("PRAGMA foreign_keys=off") - yield op - op.execute("PRAGMA foreign_keys=on") +def disable_sqlite_fkeys(conn): + if conn.dialect.name == "sqlite": + conn.execute(text("PRAGMA foreign_keys=off")) + yield conn + conn.execute(text("PRAGMA foreign_keys=on")) else: - yield op + yield conn def mysql_drop_foreignkey_if_exists(constraint_name, table_name, op): diff --git a/airflow-core/src/airflow/migrations/versions/0017_2_9_2_fix_inconsistency_between_ORM_and_migration_files.py b/airflow-core/src/airflow/migrations/versions/0017_2_9_2_fix_inconsistency_between_ORM_and_migration_files.py index fa24916df6f..0aa329b20f7 100644 --- a/airflow-core/src/airflow/migrations/versions/0017_2_9_2_fix_inconsistency_between_ORM_and_migration_files.py +++ b/airflow-core/src/airflow/migrations/versions/0017_2_9_2_fix_inconsistency_between_ORM_and_migration_files.py @@ -31,6 +31,8 @@ import sqlalchemy as sa from alembic import op from sqlalchemy import literal +from airflow.migrations.utils import disable_sqlite_fkeys + # revision identifiers, used by Alembic. revision = "686269002441" down_revision = "bff083ad727d" @@ -76,31 +78,30 @@ def upgrade(): elif conn.dialect.name == "sqlite": # SQLite does not support DROP CONSTRAINT # We have to recreate the table without the constraint - conn.execute(sa.text("PRAGMA foreign_keys=off")) - conn.execute( - sa.text(""" - CREATE TABLE connection_new ( - id INTEGER NOT NULL, - conn_id VARCHAR(250) NOT NULL, - conn_type VARCHAR(500) NOT NULL, - host VARCHAR(500), - schema VARCHAR(500), - login TEXT, - password TEXT, - port INTEGER, - extra TEXT, - is_encrypted BOOLEAN, - is_extra_encrypted BOOLEAN, - description VARCHAR(5000), - CONSTRAINT connection_pkey PRIMARY KEY (id), - CONSTRAINT connection_conn_id_uq UNIQUE (conn_id) - ) - """) - ) - conn.execute(sa.text("INSERT INTO connection_new SELECT * FROM connection")) - conn.execute(sa.text("DROP TABLE connection")) - conn.execute(sa.text("ALTER TABLE connection_new RENAME TO connection")) - conn.execute(sa.text("PRAGMA foreign_keys=on")) + with disable_sqlite_fkeys(conn): + conn.execute( + sa.text(""" + CREATE TABLE connection_new ( + id INTEGER NOT NULL, + conn_id VARCHAR(250) NOT NULL, + conn_type VARCHAR(500) NOT NULL, + host VARCHAR(500), + schema VARCHAR(500), + login TEXT, + password TEXT, + port INTEGER, + extra TEXT, + is_encrypted BOOLEAN, + is_extra_encrypted BOOLEAN, + description VARCHAR(5000), + CONSTRAINT connection_pkey PRIMARY KEY (id), + CONSTRAINT connection_conn_id_uq UNIQUE (conn_id) + ) + """) + ) + conn.execute(sa.text("INSERT INTO connection_new SELECT * FROM connection")) + conn.execute(sa.text("DROP TABLE connection")) + conn.execute(sa.text("ALTER TABLE connection_new RENAME TO connection")) else: op.execute("ALTER TABLE connection DROP CONSTRAINT IF EXISTS unique_conn_id") # Dropping and recreating cause there's no IF NOT EXISTS @@ -213,45 +214,45 @@ def upgrade(): elif conn.dialect.name == "sqlite": # SQLite does not support DROP CONSTRAINT # We have to recreate the table without the constraint - conn.execute(sa.text("PRAGMA foreign_keys=off")) - conn.execute( - sa.text(""" - CREATE TABLE dag_run_new ( - id INTEGER NOT NULL, - dag_id VARCHAR(250) NOT NULL, - queued_at TIMESTAMP, - execution_date TIMESTAMP NOT NULL, - start_date TIMESTAMP, - end_date TIMESTAMP, - state VARCHAR(50), - run_id VARCHAR(250) NOT NULL, - creating_job_id INTEGER, - external_trigger BOOLEAN, - run_type VARCHAR(50) NOT NULL, - conf BLOB, - data_interval_start TIMESTAMP, - data_interval_end TIMESTAMP, - last_scheduling_decision TIMESTAMP, - dag_hash VARCHAR(32), - log_template_id INTEGER, - updated_at TIMESTAMP, - clear_number INTEGER DEFAULT '0' NOT NULL, - CONSTRAINT dag_run_pkey PRIMARY KEY (id), - CONSTRAINT dag_run_dag_id_execution_date_key UNIQUE (dag_id, execution_date), - CONSTRAINT dag_run_dag_id_run_id_key UNIQUE (dag_id, run_id), - CONSTRAINT task_instance_log_template_id_fkey FOREIGN KEY(log_template_id) REFERENCES log_template (id) ON DELETE NO ACTION + with disable_sqlite_fkeys(conn): + conn.execute( + sa.text(""" + CREATE TABLE dag_run_new ( + id INTEGER NOT NULL, + dag_id VARCHAR(250) NOT NULL, + queued_at TIMESTAMP, + execution_date TIMESTAMP NOT NULL, + start_date TIMESTAMP, + end_date TIMESTAMP, + state VARCHAR(50), + run_id VARCHAR(250) NOT NULL, + creating_job_id INTEGER, + external_trigger BOOLEAN, + run_type VARCHAR(50) NOT NULL, + conf BLOB, + data_interval_start TIMESTAMP, + data_interval_end TIMESTAMP, + last_scheduling_decision TIMESTAMP, + dag_hash VARCHAR(32), + log_template_id INTEGER, + updated_at TIMESTAMP, + clear_number INTEGER DEFAULT '0' NOT NULL, + CONSTRAINT dag_run_pkey PRIMARY KEY (id), + CONSTRAINT dag_run_dag_id_execution_date_key UNIQUE (dag_id, execution_date), + CONSTRAINT dag_run_dag_id_run_id_key UNIQUE (dag_id, run_id), + CONSTRAINT task_instance_log_template_id_fkey FOREIGN KEY(log_template_id) REFERENCES log_template (id) ON DELETE NO ACTION + ) + """) ) - """) - ) - headers = ( - "id, dag_id, queued_at, execution_date, start_date, end_date, state, run_id, creating_job_id, " - "external_trigger, run_type, conf, data_interval_start, data_interval_end, " - "last_scheduling_decision, dag_hash, log_template_id, updated_at, clear_number" - ) - conn.execute(sa.text(f"INSERT INTO dag_run_new ({headers}) SELECT {headers} FROM dag_run")) - conn.execute(sa.text("DROP TABLE dag_run")) - conn.execute(sa.text("ALTER TABLE dag_run_new RENAME TO dag_run")) - conn.execute(sa.text("PRAGMA foreign_keys=on")) + headers = ( + "id, dag_id, queued_at, execution_date, start_date, end_date, state, run_id, creating_job_id, " + "external_trigger, run_type, conf, data_interval_start, data_interval_end, " + "last_scheduling_decision, dag_hash, log_template_id, updated_at, clear_number" + ) + conn.execute(sa.text(f"INSERT INTO dag_run_new ({headers}) SELECT {headers} FROM dag_run")) + conn.execute(sa.text("DROP TABLE dag_run")) + conn.execute(sa.text("ALTER TABLE dag_run_new RENAME TO dag_run")) + with op.batch_alter_table("dag_run") as batch_op: batch_op.create_index("dag_id_state", ["dag_id", "state"], if_not_exists=True) batch_op.create_index("idx_dag_run_dag_id", ["dag_id"], if_not_exists=True)