ephraimbuddy commented on code in PR #63628:
URL: https://github.com/apache/airflow/pull/63628#discussion_r2958996788
##########
airflow-core/src/airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py:
##########
@@ -136,42 +156,79 @@ def migrate_all_data():
dag_run_table = table(
"dag_run",
column("id", sa.Integer()),
- column("dag_id", StringID()),
+ column("dag_id", sa.String()),
)
callback_table = table(
"callback",
column("id", sa.Uuid()),
column("type", sa.String(20)),
column("fetch_method", sa.String(20)),
- column("data", ExtendedJSON()),
+ column("data", sa.JSON()),
column("state", sa.String(10)),
column("priority_weight", sa.Integer()),
column("created_at", UtcDateTime(timezone=True)),
)
- conn = op.get_bind()
+ from datetime import datetime, timezone
+
+ timestamp = datetime.now(timezone.utc)
batch_num = 0
+
while True:
batch_num += 1
batch = conn.execute(
select(
deadline_table.c.id,
- deadline_table.c.dagrun_id,
- deadline_table.c.deadline_time,
deadline_table.c.callback,
deadline_table.c.callback_state,
dag_run_table.c.dag_id,
)
.join(dag_run_table, deadline_table.c.dagrun_id ==
dag_run_table.c.id)
- .where(deadline_table.c.callback_id.is_(None)) # Only get
rows that haven't been migrated yet
+ .where(deadline_table.c.callback_id.is_(None))
.limit(BATCH_SIZE)
).fetchall()
if not batch:
break
- migrate_batch(conn, deadline_table, callback_table, batch)
+ callback_inserts = []
+ deadline_updates = []
+
+ for row in batch:
+ callback_id = uuid6.uuid7()
+ cb = row.callback if isinstance(row.callback, dict) else
json.loads(row.callback)
Review Comment:
This copies `deadline.callback["__data__"]["kwargs"]` straight across
instead of first deserializing from the Task SDK serde format. That means
complex kwargs no longer get converted into the native values that
Callback.data expects before ExtendedJSON re-encodes them. Datetimes,
timedeltas, tuples, sets, etc. will change shape after upgrade.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]