This is an automated email from the ASF dual-hosted git repository.

jasonliu pushed a commit to branch 
refactor/db-migrations/replace-manual-call-with-disable_sqlite_fkeys
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit ddee3a7f8e82f4f4b03f9b5b56299d045de49196
Author: LIU ZHE YOU <zhu424....@gmail.com>
AuthorDate: Wed Jun 4 15:48:31 2025 +0800

    Refactor migration with ctx manager to diable sqlite fkey
---
 airflow-core/docs/img/airflow_erd.sha256           |   2 +-
 airflow-core/src/airflow/migrations/utils.py       |  12 +-
 ...nconsistency_between_ORM_and_migration_files.py | 127 +++++++++++----------
 3 files changed, 71 insertions(+), 70 deletions(-)

diff --git a/airflow-core/docs/img/airflow_erd.sha256 
b/airflow-core/docs/img/airflow_erd.sha256
index 51c2117fd5b..525bd9e84a8 100644
--- a/airflow-core/docs/img/airflow_erd.sha256
+++ b/airflow-core/docs/img/airflow_erd.sha256
@@ -1 +1 @@
-173317aa67c36d8a257bc31c99eeedf906390cebdd9c6941d6e9c3db0515d5c5
\ No newline at end of file
+aa6d364469ee98172ecd763c62c139caca2af6f50057c6db02dbcbfb742da9c7
\ No newline at end of file
diff --git a/airflow-core/src/airflow/migrations/utils.py 
b/airflow-core/src/airflow/migrations/utils.py
index 2dbbbece01a..957ca0a9490 100644
--- a/airflow-core/src/airflow/migrations/utils.py
+++ b/airflow-core/src/airflow/migrations/utils.py
@@ -52,13 +52,13 @@ def get_mssql_table_constraints(conn, table_name) -> 
dict[str, dict[str, list[st
 
 
 @contextmanager
-def disable_sqlite_fkeys(op):
-    if op.get_bind().dialect.name == "sqlite":
-        op.execute("PRAGMA foreign_keys=off")
-        yield op
-        op.execute("PRAGMA foreign_keys=on")
+def disable_sqlite_fkeys(conn):
+    if conn.dialect.name == "sqlite":
+        conn.execute(text("PRAGMA foreign_keys=off"))
+        yield conn
+        conn.execute(text("PRAGMA foreign_keys=on"))
     else:
-        yield op
+        yield conn
 
 
 def mysql_drop_foreignkey_if_exists(constraint_name, table_name, op):
diff --git 
a/airflow-core/src/airflow/migrations/versions/0017_2_9_2_fix_inconsistency_between_ORM_and_migration_files.py
 
b/airflow-core/src/airflow/migrations/versions/0017_2_9_2_fix_inconsistency_between_ORM_and_migration_files.py
index fa24916df6f..0aa329b20f7 100644
--- 
a/airflow-core/src/airflow/migrations/versions/0017_2_9_2_fix_inconsistency_between_ORM_and_migration_files.py
+++ 
b/airflow-core/src/airflow/migrations/versions/0017_2_9_2_fix_inconsistency_between_ORM_and_migration_files.py
@@ -31,6 +31,8 @@ import sqlalchemy as sa
 from alembic import op
 from sqlalchemy import literal
 
+from airflow.migrations.utils import disable_sqlite_fkeys
+
 # revision identifiers, used by Alembic.
 revision = "686269002441"
 down_revision = "bff083ad727d"
@@ -76,31 +78,30 @@ def upgrade():
     elif conn.dialect.name == "sqlite":
         # SQLite does not support DROP CONSTRAINT
         # We have to recreate the table without the constraint
-        conn.execute(sa.text("PRAGMA foreign_keys=off"))
-        conn.execute(
-            sa.text("""
-        CREATE TABLE connection_new (
-                id INTEGER NOT NULL,
-                conn_id VARCHAR(250) NOT NULL,
-                conn_type VARCHAR(500) NOT NULL,
-                host VARCHAR(500),
-                schema VARCHAR(500),
-                login TEXT,
-                password TEXT,
-                port INTEGER,
-                extra TEXT,
-                is_encrypted BOOLEAN,
-                is_extra_encrypted BOOLEAN,
-                description VARCHAR(5000),
-                CONSTRAINT connection_pkey PRIMARY KEY (id),
-                CONSTRAINT connection_conn_id_uq UNIQUE (conn_id)
-        )
-        """)
-        )
-        conn.execute(sa.text("INSERT INTO connection_new SELECT * FROM 
connection"))
-        conn.execute(sa.text("DROP TABLE connection"))
-        conn.execute(sa.text("ALTER TABLE connection_new RENAME TO 
connection"))
-        conn.execute(sa.text("PRAGMA foreign_keys=on"))
+        with disable_sqlite_fkeys(conn):
+            conn.execute(
+                sa.text("""
+            CREATE TABLE connection_new (
+                    id INTEGER NOT NULL,
+                    conn_id VARCHAR(250) NOT NULL,
+                    conn_type VARCHAR(500) NOT NULL,
+                    host VARCHAR(500),
+                    schema VARCHAR(500),
+                    login TEXT,
+                    password TEXT,
+                    port INTEGER,
+                    extra TEXT,
+                    is_encrypted BOOLEAN,
+                    is_extra_encrypted BOOLEAN,
+                    description VARCHAR(5000),
+                    CONSTRAINT connection_pkey PRIMARY KEY (id),
+                    CONSTRAINT connection_conn_id_uq UNIQUE (conn_id)
+            )
+            """)
+            )
+            conn.execute(sa.text("INSERT INTO connection_new SELECT * FROM 
connection"))
+            conn.execute(sa.text("DROP TABLE connection"))
+            conn.execute(sa.text("ALTER TABLE connection_new RENAME TO 
connection"))
     else:
         op.execute("ALTER TABLE connection DROP CONSTRAINT IF EXISTS 
unique_conn_id")
         # Dropping and recreating cause there's no IF NOT EXISTS
@@ -213,45 +214,45 @@ def upgrade():
     elif conn.dialect.name == "sqlite":
         # SQLite does not support DROP CONSTRAINT
         # We have to recreate the table without the constraint
-        conn.execute(sa.text("PRAGMA foreign_keys=off"))
-        conn.execute(
-            sa.text("""
-            CREATE TABLE dag_run_new (
-                id INTEGER NOT NULL,
-                dag_id VARCHAR(250) NOT NULL,
-                queued_at TIMESTAMP,
-                execution_date TIMESTAMP NOT NULL,
-                start_date TIMESTAMP,
-                end_date TIMESTAMP,
-                state VARCHAR(50),
-                run_id VARCHAR(250) NOT NULL,
-                creating_job_id INTEGER,
-                external_trigger BOOLEAN,
-                run_type VARCHAR(50) NOT NULL,
-                conf BLOB,
-                data_interval_start TIMESTAMP,
-                data_interval_end TIMESTAMP,
-                last_scheduling_decision TIMESTAMP,
-                dag_hash VARCHAR(32),
-                log_template_id INTEGER,
-                updated_at TIMESTAMP,
-                clear_number INTEGER DEFAULT '0' NOT NULL,
-                CONSTRAINT dag_run_pkey PRIMARY KEY (id),
-                CONSTRAINT dag_run_dag_id_execution_date_key UNIQUE (dag_id, 
execution_date),
-                CONSTRAINT dag_run_dag_id_run_id_key UNIQUE (dag_id, run_id),
-                CONSTRAINT task_instance_log_template_id_fkey FOREIGN 
KEY(log_template_id) REFERENCES log_template (id) ON DELETE NO ACTION
+        with disable_sqlite_fkeys(conn):
+            conn.execute(
+                sa.text("""
+                CREATE TABLE dag_run_new (
+                    id INTEGER NOT NULL,
+                    dag_id VARCHAR(250) NOT NULL,
+                    queued_at TIMESTAMP,
+                    execution_date TIMESTAMP NOT NULL,
+                    start_date TIMESTAMP,
+                    end_date TIMESTAMP,
+                    state VARCHAR(50),
+                    run_id VARCHAR(250) NOT NULL,
+                    creating_job_id INTEGER,
+                    external_trigger BOOLEAN,
+                    run_type VARCHAR(50) NOT NULL,
+                    conf BLOB,
+                    data_interval_start TIMESTAMP,
+                    data_interval_end TIMESTAMP,
+                    last_scheduling_decision TIMESTAMP,
+                    dag_hash VARCHAR(32),
+                    log_template_id INTEGER,
+                    updated_at TIMESTAMP,
+                    clear_number INTEGER DEFAULT '0' NOT NULL,
+                    CONSTRAINT dag_run_pkey PRIMARY KEY (id),
+                    CONSTRAINT dag_run_dag_id_execution_date_key UNIQUE 
(dag_id, execution_date),
+                    CONSTRAINT dag_run_dag_id_run_id_key UNIQUE (dag_id, 
run_id),
+                    CONSTRAINT task_instance_log_template_id_fkey FOREIGN 
KEY(log_template_id) REFERENCES log_template (id) ON DELETE NO ACTION
+                )
+            """)
             )
-        """)
-        )
-        headers = (
-            "id, dag_id, queued_at, execution_date, start_date, end_date, 
state, run_id, creating_job_id, "
-            "external_trigger, run_type, conf, data_interval_start, 
data_interval_end, "
-            "last_scheduling_decision, dag_hash, log_template_id, updated_at, 
clear_number"
-        )
-        conn.execute(sa.text(f"INSERT INTO dag_run_new ({headers}) SELECT 
{headers} FROM dag_run"))
-        conn.execute(sa.text("DROP TABLE dag_run"))
-        conn.execute(sa.text("ALTER TABLE dag_run_new RENAME TO dag_run"))
-        conn.execute(sa.text("PRAGMA foreign_keys=on"))
+            headers = (
+                "id, dag_id, queued_at, execution_date, start_date, end_date, 
state, run_id, creating_job_id, "
+                "external_trigger, run_type, conf, data_interval_start, 
data_interval_end, "
+                "last_scheduling_decision, dag_hash, log_template_id, 
updated_at, clear_number"
+            )
+            conn.execute(sa.text(f"INSERT INTO dag_run_new ({headers}) SELECT 
{headers} FROM dag_run"))
+            conn.execute(sa.text("DROP TABLE dag_run"))
+            conn.execute(sa.text("ALTER TABLE dag_run_new RENAME TO dag_run"))
+
         with op.batch_alter_table("dag_run") as batch_op:
             batch_op.create_index("dag_id_state", ["dag_id", "state"], 
if_not_exists=True)
             batch_op.create_index("idx_dag_run_dag_id", ["dag_id"], 
if_not_exists=True)

Reply via email to