amoghrajesh commented on code in PR #63628:
URL: https://github.com/apache/airflow/pull/63628#discussion_r2946360116
##########
airflow-core/src/airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py:
##########
@@ -44,70 +39,16 @@
depends_on = None
airflow_version = "3.2.0"
-BATCH_SIZE = 1000
Review Comment:
Let's keep the constant. I also think we can consider increasing this value
a bit higher, like 5000 maybe
##########
airflow-core/src/airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py:
##########
@@ -122,56 +63,159 @@ def migrate_all_data():
op.execute("DELETE FROM deadline")
return
- deadline_table = table(
+ conn = op.get_bind()
+ dialect = conn.dialect.name
+
+ if dialect == "postgresql":
+ # PostgreSQL: use gen_random_uuid() and jsonb operations to avoid
Python
+ # deserialization. The callback JSON is serde-wrapped:
+ # {"__data__": {"path": "...", "kwargs": {...}},
"__classname__": "...", ...}
+ # We extract __data__ fields and merge in prefix + dag_id.
+ # A writable CTE handles both the INSERT into callback and the
UPDATE of
+ # deadline in a single statement, so the generated UUID is shared.
+ conn.execute(
+ sa.text("""
+ WITH new_callbacks AS (
+ SELECT
+ d.id AS deadline_id,
+ gen_random_uuid() AS callback_id,
+ jsonb_build_object(
+ 'path', d.callback->'__data__'->>'path',
+ 'kwargs', d.callback->'__data__'->'kwargs',
+ 'prefix', :prefix,
+ 'dag_id', dr.dag_id
+ )::json AS callback_data,
+ CASE
+ WHEN d.callback_state IN ('failed', 'success')
THEN d.callback_state
+ ELSE :pending
+ END AS cb_state,
+ CASE
+ WHEN d.callback_state IN ('failed', 'success')
THEN true
+ ELSE false
+ END AS is_missed
+ FROM deadline d
+ JOIN dag_run dr ON d.dagrun_id = dr.id
Review Comment:
And ofc drop it at the end
##########
airflow-core/src/airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py:
##########
@@ -27,15 +27,10 @@
from __future__ import annotations
-from datetime import datetime, timezone
from textwrap import dedent
import sqlalchemy as sa
from alembic import context, op
-from sqlalchemy import column, select, table
Review Comment:
Let's not bloat the PR with uneccesary changes please. Lets continue using
`column`, `select`, `table` as is and not change to `sa.*`
##########
airflow-core/src/airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py:
##########
@@ -122,56 +63,159 @@ def migrate_all_data():
op.execute("DELETE FROM deadline")
return
- deadline_table = table(
+ conn = op.get_bind()
+ dialect = conn.dialect.name
+
+ if dialect == "postgresql":
+ # PostgreSQL: use gen_random_uuid() and jsonb operations to avoid
Python
+ # deserialization. The callback JSON is serde-wrapped:
+ # {"__data__": {"path": "...", "kwargs": {...}},
"__classname__": "...", ...}
+ # We extract __data__ fields and merge in prefix + dag_id.
+ # A writable CTE handles both the INSERT into callback and the
UPDATE of
+ # deadline in a single statement, so the generated UUID is shared.
+ conn.execute(
+ sa.text("""
+ WITH new_callbacks AS (
+ SELECT
+ d.id AS deadline_id,
+ gen_random_uuid() AS callback_id,
+ jsonb_build_object(
+ 'path', d.callback->'__data__'->>'path',
+ 'kwargs', d.callback->'__data__'->'kwargs',
+ 'prefix', :prefix,
+ 'dag_id', dr.dag_id
+ )::json AS callback_data,
+ CASE
+ WHEN d.callback_state IN ('failed', 'success')
THEN d.callback_state
+ ELSE :pending
+ END AS cb_state,
+ CASE
+ WHEN d.callback_state IN ('failed', 'success')
THEN true
+ ELSE false
+ END AS is_missed
+ FROM deadline d
+ JOIN dag_run dr ON d.dagrun_id = dr.id
+ WHERE d.callback_id IS NULL
+ ),
+ inserted AS (
+ INSERT INTO callback (id, type, fetch_method, data,
state, priority_weight, created_at)
+ SELECT
+ callback_id, :cb_type, :fetch_method,
callback_data, cb_state, 1, NOW()
+ FROM new_callbacks
+ )
+ UPDATE deadline
+ SET callback_id = nc.callback_id, missed = nc.is_missed
+ FROM new_callbacks nc
+ WHERE deadline.id = nc.deadline_id
+ """),
+ {
+ "cb_type": _CALLBACK_TYPE_TRIGGERER,
+ "fetch_method": _CALLBACK_FETCH_METHOD_IMPORT_PATH,
+ "prefix": _CALLBACK_METRICS_PREFIX,
+ "pending": _CALLBACK_STATE_PENDING,
+ },
+ )
+ else:
+ # MySQL / SQLite: use batched Python approach with SQL JSON
extraction.
+ # UUID generation requires Python (no reliable cross-dialect SQL
UUID function
+ # that matches SQLAlchemy's Uuid column type).
+ _migrate_batched(conn, dialect)
+
+ def _migrate_batched(conn, dialect):
+ """Batch migration for MySQL/SQLite using Python UUIDs with SQL JSON
extraction."""
+ import json
+
+ import uuid6
+
+ from airflow.utils.sqlalchemy import UtcDateTime
+
+ deadline_table = sa.table(
"deadline",
- column("id", sa.Uuid()),
- column("dagrun_id", sa.Integer()),
- column("deadline_time", UtcDateTime(timezone=True)),
- column("callback", sa.JSON()),
- column("callback_state", sa.String(20)),
- column("missed", sa.Boolean()),
- column("callback_id", sa.Uuid()),
+ sa.column("id", sa.Uuid()),
+ sa.column("dagrun_id", sa.Integer()),
+ sa.column("callback", sa.JSON()),
+ sa.column("callback_state", sa.String(20)),
+ sa.column("missed", sa.Boolean()),
+ sa.column("callback_id", sa.Uuid()),
)
- dag_run_table = table(
+ dag_run_table = sa.table(
"dag_run",
- column("id", sa.Integer()),
- column("dag_id", StringID()),
+ sa.column("id", sa.Integer()),
+ sa.column("dag_id", sa.String()),
)
- callback_table = table(
+ callback_table = sa.table(
"callback",
- column("id", sa.Uuid()),
- column("type", sa.String(20)),
- column("fetch_method", sa.String(20)),
- column("data", ExtendedJSON()),
- column("state", sa.String(10)),
- column("priority_weight", sa.Integer()),
- column("created_at", UtcDateTime(timezone=True)),
+ sa.column("id", sa.Uuid()),
+ sa.column("type", sa.String(20)),
+ sa.column("fetch_method", sa.String(20)),
+ sa.column("data", sa.JSON()),
+ sa.column("state", sa.String(10)),
+ sa.column("priority_weight", sa.Integer()),
+ sa.column("created_at", UtcDateTime(timezone=True)),
)
- conn = op.get_bind()
+ from datetime import datetime, timezone
Review Comment:
Top level import please
##########
airflow-core/src/airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py:
##########
@@ -122,56 +63,159 @@ def migrate_all_data():
op.execute("DELETE FROM deadline")
return
- deadline_table = table(
+ conn = op.get_bind()
+ dialect = conn.dialect.name
+
+ if dialect == "postgresql":
+ # PostgreSQL: use gen_random_uuid() and jsonb operations to avoid
Python
+ # deserialization. The callback JSON is serde-wrapped:
+ # {"__data__": {"path": "...", "kwargs": {...}},
"__classname__": "...", ...}
+ # We extract __data__ fields and merge in prefix + dag_id.
+ # A writable CTE handles both the INSERT into callback and the
UPDATE of
+ # deadline in a single statement, so the generated UUID is shared.
+ conn.execute(
+ sa.text("""
+ WITH new_callbacks AS (
+ SELECT
+ d.id AS deadline_id,
+ gen_random_uuid() AS callback_id,
+ jsonb_build_object(
+ 'path', d.callback->'__data__'->>'path',
+ 'kwargs', d.callback->'__data__'->'kwargs',
+ 'prefix', :prefix,
+ 'dag_id', dr.dag_id
+ )::json AS callback_data,
+ CASE
+ WHEN d.callback_state IN ('failed', 'success')
THEN d.callback_state
+ ELSE :pending
+ END AS cb_state,
+ CASE
+ WHEN d.callback_state IN ('failed', 'success')
THEN true
+ ELSE false
+ END AS is_missed
+ FROM deadline d
+ JOIN dag_run dr ON d.dagrun_id = dr.id
Review Comment:
I think this is significantly adding up to the time complexity. How about we
add a new temporary column before the migration starts onto the deadline column?
Something like:
```python
op.add_column("deadline", sa.Column("temp_dag_id", sa.String(250),
nullable=True))
op.execute("""
UPDATE deadline
SET temp_dag_id = (SELECT dag_id FROM dag_run WHERE id =
deadline.dagrun_id)
WHERE temp_dag_id IS NULL
""")
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]