This is an automated email from the ASF dual-hosted git repository.
vatsrahul1001 pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-2-test by this push:
new e33c99f1500 [v3-2-test] fix: migrate existing deadline rows in
migration 0080 upgrade and downgrade (#66016) (#67129)
e33c99f1500 is described below
commit e33c99f15004ee40d03599b26215845031eaceaf
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue May 19 10:04:49 2026 +0530
[v3-2-test] fix: migrate existing deadline rows in migration 0080 upgrade
and downgrade (#66016) (#67129)
[v3-2-test] fix: migrate existing deadline rows in migration 0080 upgrade
and downgrade (#66016) (#67129)
---------
(cherry picked from commit c8a6c55cac8031c4131b5b0e064c45cb232b9966)
Co-authored-by: Rahul Vats <[email protected]>
---
.../0080_3_1_0_modify_deadline_callback_schema.py | 195 ++++++++++++++-
...0_replace_deadline_inline_callback_with_fkey.py | 31 ++-
.../test_0080_deadline_callback_migration.py | 268 +++++++++++++++++++++
.../test_0094_deadline_callback_migration.py | 178 ++++++++++++++
4 files changed, 662 insertions(+), 10 deletions(-)
diff --git
a/airflow-core/src/airflow/migrations/versions/0080_3_1_0_modify_deadline_callback_schema.py
b/airflow-core/src/airflow/migrations/versions/0080_3_1_0_modify_deadline_callback_schema.py
index adb3354512d..23b00e1a1eb 100644
---
a/airflow-core/src/airflow/migrations/versions/0080_3_1_0_modify_deadline_callback_schema.py
+++
b/airflow-core/src/airflow/migrations/versions/0080_3_1_0_modify_deadline_callback_schema.py
@@ -27,8 +27,13 @@ Create Date: 2025-07-31 19:35:53.150465
from __future__ import annotations
+import json
+from textwrap import dedent
+
import sqlalchemy as sa
-from alembic import op
+from alembic import context, op
+
+from airflow.configuration import conf
# revision identifiers, used by Alembic.
revision = "808787349f22"
@@ -38,17 +43,199 @@ depends_on = None
airflow_version = "3.1.0"
+_ASYNC_CALLBACK_CLASSNAME = "airflow.sdk.definitions.deadline.AsyncCallback"
+# Maximum length of the callback VARCHAR column in the pre-0080 schema.
+_CALLBACK_MAX_LEN = 500
+
+
def upgrade():
"""Replace deadline table's string callback and JSON callback_kwargs with
JSON callback."""
+ if context.is_offline_mode():
+ print(
+ dedent("""
+ ------------
+ -- WARNING: Unable to migrate the data in the deadline table
+ -- while in offline mode! All rows in the deadline table will
+ -- be deleted.
+ ------------
+ """)
+ )
+ op.execute("DELETE FROM deadline")
+ with op.batch_alter_table("deadline", schema=None) as batch_op:
+ batch_op.drop_column("callback")
+ batch_op.drop_column("callback_kwargs")
+ batch_op.add_column(sa.Column("callback", sa.JSON(),
nullable=False))
+ return
+
+ conn = op.get_bind()
+ batch_size = conf.getint("database", "migration_batch_size", fallback=1000)
+
+ # Add the destination column alongside the existing ones so we can migrate
+ # in batches without loading the whole table into memory at once.
+ with op.batch_alter_table("deadline", schema=None) as batch_op:
+ batch_op.add_column(sa.Column("callback_new", sa.JSON(),
nullable=True))
+
+ deadline_read = sa.table(
+ "deadline",
+ sa.column("id"),
+ sa.column("callback"),
+ sa.column("callback_kwargs", sa.JSON()),
+ sa.column("callback_new", sa.JSON()),
+ )
+ deadline_write = sa.table(
+ "deadline",
+ sa.column("id"),
+ sa.column("callback_new", sa.JSON()),
+ )
+
+ while True:
+ rows = conn.execute(
+ sa.select(
+ deadline_read.c.id,
+ deadline_read.c.callback,
+ deadline_read.c.callback_kwargs,
+ )
+ .where(deadline_read.c.callback_new.is_(None))
+ .limit(batch_size)
+ ).fetchall()
+
+ if not rows:
+ break
+
+ batch = []
+ for row in rows:
+ path = row[1] or ""
+ kwargs = row[2]
+ if isinstance(kwargs, str):
+ kwargs = json.loads(kwargs) if kwargs else {}
+ if not isinstance(kwargs, dict):
+ kwargs = {}
+ batch.append(
+ {
+ "row_id": row[0],
+ "new_callback": {
+ "__data__": {"path": path, "kwargs": kwargs},
+ "__classname__": _ASYNC_CALLBACK_CLASSNAME,
+ "__version__": 0,
+ },
+ }
+ )
+
+ conn.execute(
+ sa.update(deadline_write)
+ .where(deadline_write.c.id == sa.bindparam("row_id"))
+ .values(callback_new=sa.bindparam("new_callback")),
+ batch,
+ )
+
+ if len(rows) < batch_size:
+ break
+
with op.batch_alter_table("deadline", schema=None) as batch_op:
batch_op.drop_column("callback")
batch_op.drop_column("callback_kwargs")
- batch_op.add_column(sa.Column("callback", sa.JSON(), nullable=False))
+ batch_op.alter_column(
+ "callback_new",
+ new_column_name="callback",
+ existing_type=sa.JSON(),
+ nullable=False,
+ )
def downgrade():
"""Replace deadline table's JSON callback with string callback and JSON
callback_kwargs."""
+ if context.is_offline_mode():
+ print(
+ dedent("""
+ ------------
+ -- WARNING: Unable to migrate the data in the deadline table
+ -- while in offline mode! All rows in the deadline table will
+ -- be deleted.
+ ------------
+ """)
+ )
+ op.execute("DELETE FROM deadline")
+ with op.batch_alter_table("deadline", schema=None) as batch_op:
+ batch_op.drop_column("callback")
+ batch_op.add_column(sa.Column("callback_kwargs", sa.JSON(),
nullable=True))
+ batch_op.add_column(sa.Column("callback", sa.String(length=500),
nullable=False))
+ return
+
+ conn = op.get_bind()
+ batch_size = conf.getint("database", "migration_batch_size", fallback=1000)
+
+ # Add the restored columns alongside the existing JSON callback so we can
+ # back-fill in batches before dropping the JSON column.
with op.batch_alter_table("deadline", schema=None) as batch_op:
- batch_op.drop_column("callback")
+ batch_op.add_column(sa.Column("callback_old", sa.String(length=500),
nullable=True))
batch_op.add_column(sa.Column("callback_kwargs", sa.JSON(),
nullable=True))
- batch_op.add_column(sa.Column("callback", sa.String(length=500),
nullable=False))
+
+ deadline_read = sa.table(
+ "deadline",
+ sa.column("id"),
+ sa.column("callback", sa.JSON()),
+ sa.column("callback_old", sa.String(500)),
+ )
+ deadline_write = sa.table(
+ "deadline",
+ sa.column("id"),
+ sa.column("callback_old", sa.String(500)),
+ sa.column("callback_kwargs", sa.JSON()),
+ )
+
+ while True:
+ rows = conn.execute(
+ sa.select(deadline_read.c.id, deadline_read.c.callback)
+ .where(deadline_read.c.callback_old.is_(None))
+ .limit(batch_size)
+ ).fetchall()
+
+ if not rows:
+ break
+
+ batch = []
+ for row in rows:
+ cb = row[1]
+ if cb is None:
+ path, kwargs = "", {}
+ else:
+ if isinstance(cb, str):
+ cb = json.loads(cb)
+ cb_inner = cb.get("__data__", cb) if isinstance(cb, dict) else
{}
+ path = cb_inner.get("path", "")
+ if len(path) > _CALLBACK_MAX_LEN:
+ print(
+ f"WARNING: callback path for deadline {row[0]} exceeds
"
+ f"{_CALLBACK_MAX_LEN} chars and will be truncated."
+ )
+ path = path[:_CALLBACK_MAX_LEN]
+ kwargs = cb_inner.get("kwargs", {})
+ if not isinstance(kwargs, dict):
+ print(
+ f"WARNING: kwargs for deadline {row[0]} is not a dict "
+ f"(type={type(kwargs).__name__}); resetting to empty
dict."
+ )
+ kwargs = {}
+ batch.append({"row_id": row[0], "old_callback": path,
"old_kwargs": kwargs})
+
+ conn.execute(
+ sa.update(deadline_write)
+ .where(deadline_write.c.id == sa.bindparam("row_id"))
+ .values(
+ callback_old=sa.bindparam("old_callback"),
+ callback_kwargs=sa.bindparam("old_kwargs"),
+ ),
+ batch,
+ )
+
+ if len(rows) < batch_size:
+ break
+
+ with op.batch_alter_table("deadline", schema=None) as batch_op:
+ batch_op.drop_column("callback")
+ batch_op.alter_column(
+ "callback_old",
+ new_column_name="callback",
+ existing_type=sa.String(500),
+ nullable=False,
+ )
diff --git
a/airflow-core/src/airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py
b/airflow-core/src/airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py
index 06d0f7b43ab..0cab3d57c25 100644
---
a/airflow-core/src/airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py
+++
b/airflow-core/src/airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py
@@ -69,8 +69,8 @@ def _upgrade_postgresql(conn, batch_size):
d.id AS deadline_id,
gen_random_uuid() AS callback_id,
COALESCE(dr.dag_id, '') AS dag_id,
- d.callback::jsonb->'__data__'->>'path' AS cb_path,
- d.callback::jsonb->'__data__'->'kwargs' AS cb_kwargs,
+ COALESCE(d.callback::jsonb->'__data__'->>'path', '')
AS cb_path,
+
COALESCE(NULLIF(d.callback::jsonb->'__data__'->'kwargs', 'null'::jsonb),
'{}'::jsonb) AS cb_kwargs,
CASE
WHEN d.callback_state IN (:state_success,
:state_failed) THEN d.callback_state
ELSE :state_pending
@@ -177,6 +177,7 @@ def _upgrade_mysql_sqlite(conn, batch_size):
)
batch_num = 0
+ null_callback_count = 0
while True:
batch_num += 1
batch = conn.execute(
@@ -199,11 +200,23 @@ def _upgrade_mysql_sqlite(conn, batch_size):
for row in batch:
callback_id = uuid6.uuid7()
- cb = row.callback if isinstance(row.callback, dict) else
json.loads(row.callback)
- cb_inner = cb.get("__data__", cb)
+ raw_cb = row.callback
+ if raw_cb is None:
+ null_callback_count += 1
+ cb = {}
+ elif isinstance(raw_cb, dict):
+ cb = raw_cb
+ else:
+ cb = json.loads(raw_cb)
+ cb_inner = cb.get("__data__", cb) if isinstance(cb, dict) else {}
+ if not isinstance(cb_inner, dict):
+ cb_inner = {}
+ kwargs = cb_inner.get("kwargs", {})
+ if not isinstance(kwargs, dict):
+ kwargs = {}
cb_data = {
- "path": cb_inner.get("path", ""),
- "kwargs": cb_inner.get("kwargs", {}),
+ "path": cb_inner.get("path", "") or "",
+ "kwargs": kwargs,
"prefix": _CALLBACK_METRICS_PREFIX,
"dag_id": row.dag_id or "",
}
@@ -237,6 +250,12 @@ def _upgrade_mysql_sqlite(conn, batch_size):
)
print(f"Migrated {len(batch)} deadline records in batch {batch_num}")
+ if null_callback_count:
+ print(
+ f"WARNING: {null_callback_count} deadline rows had NULL callback "
+ "(legacy 0080 data); migrated with empty envelope."
+ )
+
def upgrade():
"""Replace Deadline table's inline callback fields with callback_id
foreign key."""
diff --git
a/airflow-core/tests/unit/migrations/test_0080_deadline_callback_migration.py
b/airflow-core/tests/unit/migrations/test_0080_deadline_callback_migration.py
new file mode 100644
index 00000000000..c777d8418b1
--- /dev/null
+++
b/airflow-core/tests/unit/migrations/test_0080_deadline_callback_migration.py
@@ -0,0 +1,268 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Regression tests for migration 0080 (808787349f22):
+upgrade() and downgrade() must correctly migrate existing deadline rows
+without raising NotNullViolation.
+"""
+
+from __future__ import annotations
+
+import importlib.util
+import json
+import uuid
+from pathlib import Path
+from unittest import mock
+
+import sqlalchemy as sa
+from alembic.migration import MigrationContext
+from alembic.operations import Operations
+
+from tests_common.test_utils.paths import AIRFLOW_CORE_SOURCES_PATH
+
+# Migration filenames start with a digit so they cannot be imported via the
+# normal import system; load the module by file path instead.
+_MIGRATION_PATH = (
+ Path(AIRFLOW_CORE_SOURCES_PATH)
+ /
"airflow/migrations/versions/0080_3_1_0_modify_deadline_callback_schema.py"
+)
+_spec = importlib.util.spec_from_file_location("migration_0080",
_MIGRATION_PATH)
+_migration = importlib.util.module_from_spec(_spec) # type: ignore[arg-type]
+_spec.loader.exec_module(_migration) # type: ignore[union-attr]
+
+upgrade = _migration.upgrade
+downgrade = _migration.downgrade
+_ASYNC_CALLBACK_CLASSNAME = _migration._ASYNC_CALLBACK_CLASSNAME
+
+_PRE_0080_DDL = """
+CREATE TABLE deadline (
+ id TEXT PRIMARY KEY,
+ dagrun_id INTEGER NOT NULL,
+ deadline_time TEXT NOT NULL,
+ callback TEXT NOT NULL,
+ callback_kwargs TEXT,
+ created_at TEXT,
+ last_updated_at TEXT
+)
+"""
+
+_POST_0080_DDL = """
+CREATE TABLE deadline (
+ id TEXT PRIMARY KEY,
+ dagrun_id INTEGER NOT NULL,
+ deadline_time TEXT NOT NULL,
+ callback TEXT NOT NULL,
+ created_at TEXT,
+ last_updated_at TEXT
+)
+"""
+
+
+def _make_engine_pre_0080():
+ """Return an in-memory SQLite engine with the pre-0080 deadline schema."""
+ engine = sa.create_engine("sqlite:///:memory:")
+ with engine.connect() as conn:
+ conn.execute(sa.text(_PRE_0080_DDL))
+ conn.commit()
+ return engine
+
+
+def _make_engine_post_0080():
+ """Return an in-memory SQLite engine with the post-0080 deadline schema."""
+ engine = sa.create_engine("sqlite:///:memory:")
+ with engine.connect() as conn:
+ conn.execute(sa.text(_POST_0080_DDL))
+ conn.commit()
+ return engine
+
+
+def _run_upgrade(engine):
+ # alembic.context is a proxy that is only populated when running through
+ # Alembic's full migration runner (alembic upgrade). When calling the
+ # migration function directly in a test we must mock it so that the
+ # is_offline_mode() guard does not raise AttributeError.
+ with engine.begin() as conn:
+ with Operations.context(MigrationContext.configure(conn)):
+ with mock.patch.object(_migration, "context") as mock_ctx:
+ mock_ctx.is_offline_mode.return_value = False
+ upgrade()
+
+
+def _run_downgrade(engine):
+ with engine.begin() as conn:
+ with Operations.context(MigrationContext.configure(conn)):
+ with mock.patch.object(_migration, "context") as mock_ctx:
+ mock_ctx.is_offline_mode.return_value = False
+ downgrade()
+
+
+def _read_deadline(engine):
+ with engine.connect() as conn:
+ return conn.execute(sa.text("SELECT * FROM deadline")).mappings().all()
+
+
+class TestMigration0080Upgrade:
+ def test_upgrade_empty_table(self):
+ """Upgrade on an empty table must not raise."""
+ engine = _make_engine_pre_0080()
+ _run_upgrade(engine)
+ rows = _read_deadline(engine)
+ assert rows == []
+
+ def test_upgrade_migrates_existing_row(self):
+ """Upgrade converts VARCHAR callback + JSON kwargs to the expected
JSON envelope."""
+ engine = _make_engine_pre_0080()
+ row_id = str(uuid.uuid4())
+ with engine.begin() as conn:
+ conn.execute(
+ sa.text(
+ "INSERT INTO deadline (id, dagrun_id, deadline_time,
callback, callback_kwargs)"
+ " VALUES (:id, 1, '2025-01-01', :cb, :kw)"
+ ),
+ {"id": row_id, "cb": "mymodule.my_callback", "kw":
json.dumps({"key": "val"})},
+ )
+
+ _run_upgrade(engine)
+
+ rows = _read_deadline(engine)
+ assert len(rows) == 1
+ cb = rows[0]["callback"]
+ if isinstance(cb, str):
+ cb = json.loads(cb)
+ assert cb["__classname__"] == _ASYNC_CALLBACK_CLASSNAME
+ assert cb["__version__"] == 0
+ assert cb["__data__"]["path"] == "mymodule.my_callback"
+ assert cb["__data__"]["kwargs"] == {"key": "val"}
+ assert "callback_kwargs" not in rows[0]
+
+ def test_upgrade_null_kwargs_defaults_to_empty_dict(self):
+ """Upgrade with NULL callback_kwargs must produce an empty dict in the
envelope."""
+ engine = _make_engine_pre_0080()
+ row_id = str(uuid.uuid4())
+ with engine.begin() as conn:
+ conn.execute(
+ sa.text(
+ "INSERT INTO deadline (id, dagrun_id, deadline_time,
callback, callback_kwargs)"
+ " VALUES (:id, 1, '2025-01-01', :cb, NULL)"
+ ),
+ {"id": row_id, "cb": "mymodule.my_callback"},
+ )
+
+ _run_upgrade(engine)
+
+ rows = _read_deadline(engine)
+ cb = rows[0]["callback"]
+ if isinstance(cb, str):
+ cb = json.loads(cb)
+ assert cb["__data__"]["kwargs"] == {}
+
+ def test_upgrade_exact_batch_boundary(self, monkeypatch):
+ """Rows == batch_size must force a second iteration that returns 0
rows and exits cleanly."""
+ # Force a small batch_size so 2 inserted rows == batch_size exactly.
+ monkeypatch.setattr(_migration.conf, "getint", lambda *a, **kw: 2)
+ engine = _make_engine_pre_0080()
+ with engine.begin() as conn:
+ for i in range(2):
+ conn.execute(
+ sa.text(
+ "INSERT INTO deadline (id, dagrun_id, deadline_time,
callback, callback_kwargs)"
+ " VALUES (:id, 1, '2025-01-01', :cb, :kw)"
+ ),
+ {"id": str(uuid.uuid4()), "cb": f"mod.cb_{i}", "kw":
json.dumps({"i": i})},
+ )
+
+ _run_upgrade(engine)
+
+ rows = _read_deadline(engine)
+ assert len(rows) == 2
+ paths = sorted(
+ (json.loads(r["callback"]) if isinstance(r["callback"], str) else
r["callback"])["__data__"][
+ "path"
+ ]
+ for r in rows
+ )
+ assert paths == ["mod.cb_0", "mod.cb_1"]
+
+
+class TestMigration0080Downgrade:
+ def test_downgrade_empty_table(self):
+ """Downgrade on an empty table must not raise."""
+ engine = _make_engine_post_0080()
+ _run_downgrade(engine)
+ rows = _read_deadline(engine)
+ assert rows == []
+
+ def test_downgrade_restores_existing_row(self):
+ """Downgrade extracts path and kwargs back from the JSON envelope."""
+ engine = _make_engine_post_0080()
+ row_id = str(uuid.uuid4())
+ callback_json = json.dumps(
+ {
+ "__data__": {"path": "mymodule.my_callback", "kwargs": {"key":
"val"}},
+ "__classname__": _ASYNC_CALLBACK_CLASSNAME,
+ "__version__": 0,
+ }
+ )
+ with engine.begin() as conn:
+ conn.execute(
+ sa.text(
+ "INSERT INTO deadline (id, dagrun_id, deadline_time,
callback)"
+ " VALUES (:id, 1, '2025-01-01', :cb)"
+ ),
+ {"id": row_id, "cb": callback_json},
+ )
+
+ _run_downgrade(engine)
+
+ rows = _read_deadline(engine)
+ assert len(rows) == 1
+ assert rows[0]["callback"] == "mymodule.my_callback"
+ kw = rows[0]["callback_kwargs"]
+ if isinstance(kw, str):
+ kw = json.loads(kw)
+ assert kw == {"key": "val"}
+
+
+class TestMigration0080RoundTrip:
+ def test_round_trip_preserves_data(self):
+ """Upgrade followed by downgrade preserves the original callback
path."""
+ engine = _make_engine_pre_0080()
+ row_id = str(uuid.uuid4())
+ original_path = "mymodule.my_callback"
+ original_kwargs = {"x": 1}
+
+ with engine.begin() as conn:
+ conn.execute(
+ sa.text(
+ "INSERT INTO deadline (id, dagrun_id, deadline_time,
callback, callback_kwargs)"
+ " VALUES (:id, 1, '2025-01-01', :cb, :kw)"
+ ),
+ {"id": row_id, "cb": original_path, "kw":
json.dumps(original_kwargs)},
+ )
+
+ _run_upgrade(engine)
+ _run_downgrade(engine)
+
+ rows = _read_deadline(engine)
+ assert len(rows) == 1
+ assert rows[0]["callback"] == original_path
+ kw = rows[0]["callback_kwargs"]
+ if isinstance(kw, str):
+ kw = json.loads(kw)
+ assert kw == original_kwargs
diff --git
a/airflow-core/tests/unit/migrations/test_0094_deadline_callback_migration.py
b/airflow-core/tests/unit/migrations/test_0094_deadline_callback_migration.py
new file mode 100644
index 00000000000..3d848bf843a
--- /dev/null
+++
b/airflow-core/tests/unit/migrations/test_0094_deadline_callback_migration.py
@@ -0,0 +1,178 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Regression tests for migration 0094 (e812941398f4).
+
+These tests focus on the defensive NULL-callback path: legacy MySQL
+deployments that ran the original (buggy) 0080 left ``deadline.callback``
+rows as NULL. 0094 must heal those rows instead of crashing on
+``json.loads(None)``.
+"""
+
+from __future__ import annotations
+
+import importlib.util
+import json
+import uuid
+from pathlib import Path
+
+import sqlalchemy as sa
+
+from tests_common.test_utils.paths import AIRFLOW_CORE_SOURCES_PATH
+
+_MIGRATION_PATH = (
+ Path(AIRFLOW_CORE_SOURCES_PATH)
+ /
"airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py"
+)
+_spec = importlib.util.spec_from_file_location("migration_0094",
_MIGRATION_PATH)
+_migration = importlib.util.module_from_spec(_spec) # type: ignore[arg-type]
+_spec.loader.exec_module(_migration) # type: ignore[union-attr]
+
+
+# Minimal post-0080 / pre-0094 schema. 0094 adds ``missed`` and ``callback_id``
+# itself before invoking ``_upgrade_mysql_sqlite``; we mimic that here so we
+# can call the inner helper directly without driving the full alembic chain.
+_POST_0080_DDL = [
+ """
+ CREATE TABLE dag_run (
+ id INTEGER PRIMARY KEY,
+ dag_id TEXT NOT NULL
+ )
+ """,
+ """
+ CREATE TABLE deadline (
+ id TEXT PRIMARY KEY,
+ dagrun_id INTEGER NOT NULL,
+ deadline_time TEXT NOT NULL,
+ callback TEXT,
+ callback_state TEXT,
+ trigger_id INTEGER,
+ callback_id TEXT,
+ missed BOOLEAN
+ )
+ """,
+ """
+ CREATE TABLE callback (
+ id TEXT PRIMARY KEY,
+ type TEXT NOT NULL,
+ fetch_method TEXT NOT NULL,
+ data TEXT NOT NULL,
+ state TEXT NOT NULL,
+ priority_weight INTEGER NOT NULL,
+ created_at TEXT NOT NULL
+ )
+ """,
+]
+
+
+def _make_engine():
+ engine = sa.create_engine("sqlite:///:memory:")
+ with engine.connect() as conn:
+ for ddl in _POST_0080_DDL:
+ conn.execute(sa.text(ddl))
+ conn.commit()
+ return engine
+
+
+def _insert_dagrun(conn, dagrun_id: int = 1, dag_id: str = "test_dag"):
+ conn.execute(
+ sa.text("INSERT INTO dag_run (id, dag_id) VALUES (:id, :dag_id)"),
+ {"id": dagrun_id, "dag_id": dag_id},
+ )
+
+
+def _insert_deadline(conn, deadline_id: str, callback, callback_state: str |
None = None):
+ conn.execute(
+ sa.text(
+ "INSERT INTO deadline (id, dagrun_id, deadline_time, callback,
callback_state)"
+ " VALUES (:id, 1, '2025-01-01', :cb, :state)"
+ ),
+ {"id": deadline_id, "cb": callback, "state": callback_state},
+ )
+
+
+class TestMigration0094NullCallbackRepair:
+ """A NULL callback row from a buggy 0080 must not crash 0094's upgrade."""
+
+ def test_null_callback_does_not_crash(self):
+ engine = _make_engine()
+ # `_upgrade_mysql_sqlite` declares ``id`` as ``sa.Uuid()``; on SQLite
the
+ # write path emits the hex (no-dash) form. Insert IDs in that same
form so
+ # the UPDATE in the migration loop matches the row we created.
+ deadline_id = uuid.uuid4().hex
+ with engine.begin() as conn:
+ _insert_dagrun(conn)
+ _insert_deadline(conn, deadline_id, callback=None)
+
+ # _upgrade_mysql_sqlite reads from `deadline` and writes to `callback`;
+ # it does not depend on alembic's batch_alter_table prelude.
+ with engine.begin() as conn:
+ _migration._upgrade_mysql_sqlite(conn, batch_size=10)
+
+ with engine.connect() as conn:
+ deadline_rows = conn.execute(sa.text("SELECT * FROM
deadline")).mappings().all()
+ callback_rows = conn.execute(sa.text("SELECT * FROM
callback")).mappings().all()
+
+ assert len(deadline_rows) == 1
+ assert len(callback_rows) == 1
+ assert deadline_rows[0]["callback_id"] == callback_rows[0]["id"]
+ assert deadline_rows[0]["missed"] == 0 # SQLite: False -> 0
+
+ cb_data = json.loads(callback_rows[0]["data"])
+ assert cb_data["path"] == ""
+ assert cb_data["kwargs"] == {}
+ assert cb_data["dag_id"] == "test_dag"
+
+ def test_mixed_null_and_valid_callbacks(self):
+ """A batch with both NULL and well-formed rows must migrate both."""
+ engine = _make_engine()
+ null_id = uuid.uuid4().hex
+ valid_id = uuid.uuid4().hex
+ valid_callback = json.dumps(
+ {
+ "__data__": {"path": "mymodule.cb", "kwargs": {"k": "v"}},
+ "__classname__":
"airflow.sdk.definitions.deadline.AsyncCallback",
+ "__version__": 0,
+ }
+ )
+ with engine.begin() as conn:
+ _insert_dagrun(conn)
+ _insert_deadline(conn, null_id, callback=None)
+ _insert_deadline(conn, valid_id, callback=valid_callback)
+
+ with engine.begin() as conn:
+ _migration._upgrade_mysql_sqlite(conn, batch_size=10)
+
+ with engine.connect() as conn:
+ rows = (
+ conn.execute(
+ sa.text(
+ "SELECT d.id AS deadline_id, c.data"
+ " FROM deadline d JOIN callback c ON d.callback_id =
c.id"
+ )
+ )
+ .mappings()
+ .all()
+ )
+
+ by_id = {r["deadline_id"]: json.loads(r["data"]) for r in rows}
+ assert by_id[null_id]["path"] == ""
+ assert by_id[null_id]["kwargs"] == {}
+ assert by_id[valid_id]["path"] == "mymodule.cb"
+ assert by_id[valid_id]["kwargs"] == {"k": "v"}