amoghrajesh commented on code in PR #63628:
URL: https://github.com/apache/airflow/pull/63628#discussion_r2946360116


##########
airflow-core/src/airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py:
##########
@@ -44,70 +39,16 @@
 depends_on = None
 airflow_version = "3.2.0"
 
-BATCH_SIZE = 1000

Review Comment:
   Let's keep the constant. I also think we can consider increasing this value 
a bit higher, like 5000 maybe



##########
airflow-core/src/airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py:
##########
@@ -122,56 +63,159 @@ def migrate_all_data():
             op.execute("DELETE FROM deadline")
             return
 
-        deadline_table = table(
+        conn = op.get_bind()
+        dialect = conn.dialect.name
+
+        if dialect == "postgresql":
+            # PostgreSQL: use gen_random_uuid() and jsonb operations to avoid 
Python
+            # deserialization. The callback JSON is serde-wrapped:
+            #   {"__data__": {"path": "...", "kwargs": {...}}, 
"__classname__": "...", ...}
+            # We extract __data__ fields and merge in prefix + dag_id.
+            # A writable CTE handles both the INSERT into callback and the 
UPDATE of
+            # deadline in a single statement, so the generated UUID is shared.
+            conn.execute(
+                sa.text("""
+                    WITH new_callbacks AS (
+                        SELECT
+                            d.id AS deadline_id,
+                            gen_random_uuid() AS callback_id,
+                            jsonb_build_object(
+                                'path', d.callback->'__data__'->>'path',
+                                'kwargs', d.callback->'__data__'->'kwargs',
+                                'prefix', :prefix,
+                                'dag_id', dr.dag_id
+                            )::json AS callback_data,
+                            CASE
+                                WHEN d.callback_state IN ('failed', 'success') 
THEN d.callback_state
+                                ELSE :pending
+                            END AS cb_state,
+                            CASE
+                                WHEN d.callback_state IN ('failed', 'success') 
THEN true
+                                ELSE false
+                            END AS is_missed
+                        FROM deadline d
+                        JOIN dag_run dr ON d.dagrun_id = dr.id

Review Comment:
   And ofc drop it at the end



##########
airflow-core/src/airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py:
##########
@@ -27,15 +27,10 @@
 
 from __future__ import annotations
 
-from datetime import datetime, timezone
 from textwrap import dedent
 
 import sqlalchemy as sa
 from alembic import context, op
-from sqlalchemy import column, select, table

Review Comment:
   Let's not bloat the PR with uneccesary changes please. Lets continue using 
`column`, `select`, `table` as is and not change to `sa.*`



##########
airflow-core/src/airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py:
##########
@@ -122,56 +63,159 @@ def migrate_all_data():
             op.execute("DELETE FROM deadline")
             return
 
-        deadline_table = table(
+        conn = op.get_bind()
+        dialect = conn.dialect.name
+
+        if dialect == "postgresql":
+            # PostgreSQL: use gen_random_uuid() and jsonb operations to avoid 
Python
+            # deserialization. The callback JSON is serde-wrapped:
+            #   {"__data__": {"path": "...", "kwargs": {...}}, 
"__classname__": "...", ...}
+            # We extract __data__ fields and merge in prefix + dag_id.
+            # A writable CTE handles both the INSERT into callback and the 
UPDATE of
+            # deadline in a single statement, so the generated UUID is shared.
+            conn.execute(
+                sa.text("""
+                    WITH new_callbacks AS (
+                        SELECT
+                            d.id AS deadline_id,
+                            gen_random_uuid() AS callback_id,
+                            jsonb_build_object(
+                                'path', d.callback->'__data__'->>'path',
+                                'kwargs', d.callback->'__data__'->'kwargs',
+                                'prefix', :prefix,
+                                'dag_id', dr.dag_id
+                            )::json AS callback_data,
+                            CASE
+                                WHEN d.callback_state IN ('failed', 'success') 
THEN d.callback_state
+                                ELSE :pending
+                            END AS cb_state,
+                            CASE
+                                WHEN d.callback_state IN ('failed', 'success') 
THEN true
+                                ELSE false
+                            END AS is_missed
+                        FROM deadline d
+                        JOIN dag_run dr ON d.dagrun_id = dr.id
+                        WHERE d.callback_id IS NULL
+                    ),
+                    inserted AS (
+                        INSERT INTO callback (id, type, fetch_method, data, 
state, priority_weight, created_at)
+                        SELECT
+                            callback_id, :cb_type, :fetch_method, 
callback_data, cb_state, 1, NOW()
+                        FROM new_callbacks
+                    )
+                    UPDATE deadline
+                    SET callback_id = nc.callback_id, missed = nc.is_missed
+                    FROM new_callbacks nc
+                    WHERE deadline.id = nc.deadline_id
+                """),
+                {
+                    "cb_type": _CALLBACK_TYPE_TRIGGERER,
+                    "fetch_method": _CALLBACK_FETCH_METHOD_IMPORT_PATH,
+                    "prefix": _CALLBACK_METRICS_PREFIX,
+                    "pending": _CALLBACK_STATE_PENDING,
+                },
+            )
+        else:
+            # MySQL / SQLite: use batched Python approach with SQL JSON 
extraction.
+            # UUID generation requires Python (no reliable cross-dialect SQL 
UUID function
+            # that matches SQLAlchemy's Uuid column type).
+            _migrate_batched(conn, dialect)
+
+    def _migrate_batched(conn, dialect):
+        """Batch migration for MySQL/SQLite using Python UUIDs with SQL JSON 
extraction."""
+        import json
+
+        import uuid6
+
+        from airflow.utils.sqlalchemy import UtcDateTime
+
+        deadline_table = sa.table(
             "deadline",
-            column("id", sa.Uuid()),
-            column("dagrun_id", sa.Integer()),
-            column("deadline_time", UtcDateTime(timezone=True)),
-            column("callback", sa.JSON()),
-            column("callback_state", sa.String(20)),
-            column("missed", sa.Boolean()),
-            column("callback_id", sa.Uuid()),
+            sa.column("id", sa.Uuid()),
+            sa.column("dagrun_id", sa.Integer()),
+            sa.column("callback", sa.JSON()),
+            sa.column("callback_state", sa.String(20)),
+            sa.column("missed", sa.Boolean()),
+            sa.column("callback_id", sa.Uuid()),
         )
 
-        dag_run_table = table(
+        dag_run_table = sa.table(
             "dag_run",
-            column("id", sa.Integer()),
-            column("dag_id", StringID()),
+            sa.column("id", sa.Integer()),
+            sa.column("dag_id", sa.String()),
         )
 
-        callback_table = table(
+        callback_table = sa.table(
             "callback",
-            column("id", sa.Uuid()),
-            column("type", sa.String(20)),
-            column("fetch_method", sa.String(20)),
-            column("data", ExtendedJSON()),
-            column("state", sa.String(10)),
-            column("priority_weight", sa.Integer()),
-            column("created_at", UtcDateTime(timezone=True)),
+            sa.column("id", sa.Uuid()),
+            sa.column("type", sa.String(20)),
+            sa.column("fetch_method", sa.String(20)),
+            sa.column("data", sa.JSON()),
+            sa.column("state", sa.String(10)),
+            sa.column("priority_weight", sa.Integer()),
+            sa.column("created_at", UtcDateTime(timezone=True)),
         )
 
-        conn = op.get_bind()
+        from datetime import datetime, timezone

Review Comment:
   Top level import please



##########
airflow-core/src/airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py:
##########
@@ -122,56 +63,159 @@ def migrate_all_data():
             op.execute("DELETE FROM deadline")
             return
 
-        deadline_table = table(
+        conn = op.get_bind()
+        dialect = conn.dialect.name
+
+        if dialect == "postgresql":
+            # PostgreSQL: use gen_random_uuid() and jsonb operations to avoid 
Python
+            # deserialization. The callback JSON is serde-wrapped:
+            #   {"__data__": {"path": "...", "kwargs": {...}}, 
"__classname__": "...", ...}
+            # We extract __data__ fields and merge in prefix + dag_id.
+            # A writable CTE handles both the INSERT into callback and the 
UPDATE of
+            # deadline in a single statement, so the generated UUID is shared.
+            conn.execute(
+                sa.text("""
+                    WITH new_callbacks AS (
+                        SELECT
+                            d.id AS deadline_id,
+                            gen_random_uuid() AS callback_id,
+                            jsonb_build_object(
+                                'path', d.callback->'__data__'->>'path',
+                                'kwargs', d.callback->'__data__'->'kwargs',
+                                'prefix', :prefix,
+                                'dag_id', dr.dag_id
+                            )::json AS callback_data,
+                            CASE
+                                WHEN d.callback_state IN ('failed', 'success') 
THEN d.callback_state
+                                ELSE :pending
+                            END AS cb_state,
+                            CASE
+                                WHEN d.callback_state IN ('failed', 'success') 
THEN true
+                                ELSE false
+                            END AS is_missed
+                        FROM deadline d
+                        JOIN dag_run dr ON d.dagrun_id = dr.id

Review Comment:
   I think this is significantly adding up to the time complexity. How about we 
add a new temporary column before the migration starts onto the deadline column?
   
   Something like:
   ```python
   op.add_column("deadline", sa.Column("temp_dag_id", sa.String(250), 
nullable=True))
   op.execute("""
       UPDATE deadline 
       SET temp_dag_id = (SELECT dag_id FROM dag_run WHERE id = 
deadline.dagrun_id)
       WHERE temp_dag_id IS NULL
   """)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to