vatsrahul1001 commented on code in PR #66016:
URL: https://github.com/apache/airflow/pull/66016#discussion_r3260489963
##########
airflow-core/src/airflow/migrations/versions/0080_3_1_0_modify_deadline_callback_schema.py:
##########
@@ -38,17 +43,199 @@
airflow_version = "3.1.0"
+_ASYNC_CALLBACK_CLASSNAME = "airflow.sdk.definitions.deadline.AsyncCallback"
+# Maximum length of the callback VARCHAR column in the pre-0080 schema.
+_CALLBACK_MAX_LEN = 500
+
+
def upgrade():
"""Replace deadline table's string callback and JSON callback_kwargs with
JSON callback."""
+ if context.is_offline_mode():
+ print(
+ dedent("""
+ ------------
+ -- WARNING: Unable to migrate the data in the deadline table
+ -- while in offline mode! All rows in the deadline table will
+ -- be deleted.
+ ------------
+ """)
+ )
+ op.execute("DELETE FROM deadline")
+ with op.batch_alter_table("deadline", schema=None) as batch_op:
+ batch_op.drop_column("callback")
+ batch_op.drop_column("callback_kwargs")
+ batch_op.add_column(sa.Column("callback", sa.JSON(),
nullable=False))
+ return
+
+ conn = op.get_bind()
+ batch_size = conf.getint("database", "migration_batch_size", fallback=1000)
+
+ # Add the destination column alongside the existing ones so we can migrate
+ # in batches without loading the whole table into memory at once.
+ with op.batch_alter_table("deadline", schema=None) as batch_op:
+ batch_op.add_column(sa.Column("callback_new", sa.JSON(),
nullable=True))
+
+ deadline_read = sa.table(
+ "deadline",
+ sa.column("id"),
+ sa.column("callback"),
+ sa.column("callback_kwargs", sa.JSON()),
+ sa.column("callback_new", sa.JSON()),
+ )
+ deadline_write = sa.table(
+ "deadline",
+ sa.column("id"),
+ sa.column("callback_new", sa.JSON()),
+ )
+
+ while True:
+ rows = conn.execute(
+ sa.select(
+ deadline_read.c.id,
+ deadline_read.c.callback,
+ deadline_read.c.callback_kwargs,
+ )
+ .where(deadline_read.c.callback_new.is_(None))
+ .limit(batch_size)
+ ).fetchall()
+
+ if not rows:
+ break
Review Comment:
Looks like CI breaking after removing this
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]