jedcunningham commented on code in PR #48070:
URL: https://github.com/apache/airflow/pull/48070#discussion_r2012243088


##########
airflow-core/src/airflow/migrations/versions/0028_3_0_0_drop_ab_user_id_foreign_key.py:
##########
@@ -39,31 +44,110 @@
 
 def upgrade():
     """Apply Drop ab_user.id foreign key."""
-    with op.batch_alter_table("dag_run_note", schema=None) as batch_op:
-        batch_op.drop_constraint("dag_run_note_user_fkey", type_="foreignkey")
-
-    with op.batch_alter_table("task_instance_note", schema=None) as batch_op:
-        batch_op.drop_constraint("task_instance_note_user_fkey", 
type_="foreignkey")
-
-    if op.get_bind().dialect.name == "mysql":
-        with op.batch_alter_table("dag_run_note", schema=None) as batch_op:
-            batch_op.drop_index("dag_run_note_user_fkey")
-
-        with op.batch_alter_table("task_instance_note", schema=None) as 
batch_op:
-            batch_op.drop_index("task_instance_note_user_fkey")
+    conn = op.get_bind()
+    dialect = conn.dialect.name
+    if dialect == "mysql":
+        mysql_drop_foreignkey_if_exists("dag_run_note", 
"dag_run_note_user_fkey", op)
+        mysql_drop_foreignkey_if_exists("task_instance_note", 
"task_instance_note_user_fkey", op)
+
+    elif dialect == "postgresql":
+        conn.execute(sa.text("ALTER TABLE dag_run_note DROP CONSTRAINT IF 
EXISTS dag_run_note_user_fkey"))
+        conn.execute(
+            sa.text("ALTER TABLE task_instance_note DROP CONSTRAINT IF EXISTS 
task_instance_note_user_fkey")
+        )
+    else:
+        # SQLite does not support DROP CONSTRAINT
+        # We have to recreate the table without the constraint
+        conn.execute(sa.text("PRAGMA foreign_keys=off"))
+        dag_run_note_new = sa.Table(
+            "dag_run_note_new",
+            metadata,
+            sa.Column("user_id", sa.Integer),
+            sa.Column("dag_run_id", sa.Integer, primary_key=True, 
nullable=False),
+            sa.Column("content", sa.String(1000)),
+            sa.Column("created_at", TIMESTAMP(), nullable=False),
+            sa.Column("updated_at", TIMESTAMP(), nullable=False),
+            sa.ForeignKeyConstraint(
+                ("dag_run_id",), ["dag_run.id"], name="dag_run_note_dr_fkey", 
ondelete="CASCADE"
+            ),
+        )
+        dag_run_note_new.create(bind=op.get_bind())
+        conn.execute(
+            sa.text("""
+            INSERT INTO dag_run_note_new (user_id, dag_run_id, content, 
created_at, updated_at)
+            SELECT user_id, dag_run_id, content, created_at, updated_at FROM 
dag_run_note
+            """)
+        )
+        conn.execute(sa.text("DROP TABLE dag_run_note"))
+        conn.execute(sa.text("ALTER TABLE dag_run_note_new RENAME TO 
dag_run_note"))
+
+        # task_instance_note
+        task_instance_note_new = sa.Table(
+            "task_instance_note_new",
+            metadata,
+            sa.Column("user_id", sa.Integer),
+            sa.Column("task_id", sa.Integer, primary_key=True, nullable=False),
+            sa.Column("dag_id", sa.String(250), primary_key=True, 
nullable=False),
+            sa.Column("run_id", sa.String(250), primary_key=True, 
nullable=False),
+            sa.Column("map_index", sa.Integer, primary_key=True, 
nullable=False),
+            sa.Column("content", sa.String(1000)),
+            sa.Column("created_at", TIMESTAMP(), nullable=False),
+            sa.Column("updated_at", TIMESTAMP(), nullable=False),
+            sa.PrimaryKeyConstraint(
+                "task_id", "dag_id", "run_id", "map_index", 
name="task_instance_note_pkey"
+            ),
+            sa.ForeignKeyConstraint(
+                (
+                    "task_id",
+                    "dag_id",
+                    "run_id",
+                    "map_index",
+                ),
+                [
+                    "task_instance.task_id",
+                    "task_instance.dag_id",
+                    "task_instance.run_id",
+                    "task_instance.map_index",
+                ],
+                name="task_instance_note_ti_fkey",
+                ondelete="CASCADE",
+            ),
+        )
+        task_instance_note_new.create(bind=op.get_bind())
+        conn.execute(
+            sa.text("""
+            INSERT INTO task_instance_note_new (user_id, task_id, dag_id, 
run_id, map_index, content, created_at, updated_at)
+            SELECT user_id, task_id, dag_id, run_id, map_index, content, 
created_at, updated_at FROM task_instance_note
+            """)
+        )
+        conn.execute(sa.text("DROP TABLE task_instance_note"))
+        conn.execute(sa.text("ALTER TABLE task_instance_note_new RENAME TO 
task_instance_note"))
+        conn.execute(sa.text("PRAGMA foreign_keys=on"))
 
 
 def downgrade():
     """Unapply Drop ab_user.id foreign key."""
-    with op.batch_alter_table("task_instance_note", schema=None) as batch_op:
-        batch_op.create_foreign_key("task_instance_note_user_fkey", "ab_user", 
["user_id"], ["id"])
-
-    with op.batch_alter_table("dag_run_note", schema=None) as batch_op:
-        batch_op.create_foreign_key("dag_run_note_user_fkey", "ab_user", 
["user_id"], ["id"])
-
-    if op.get_bind().dialect.name == "mysql":
+    conn = op.get_bind()
+    dialect = conn.dialect.name
+
+    if dialect == "sqlite":
+        table_exists = conn.execute(
+            sa.text("SELECT name FROM sqlite_master WHERE type='table' AND 
name='ab_user'")
+        ).fetchone()
+    else:
+        table_exists = conn.execute(
+            sa.text("SELECT 1 FROM information_schema.tables WHERE table_name 
= 'ab_user'")
+        ).scalar()
+    if table_exists:

Review Comment:
   Weren't we going to enforce that this table exists? We can't just do this 
optionally.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to