This is an automated email from the ASF dual-hosted git repository.

vatsrahul1001 pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-2-test by this push:
     new e33c99f1500 [v3-2-test] fix: migrate existing deadline rows in 
migration 0080 upgrade and downgrade (#66016) (#67129)
e33c99f1500 is described below

commit e33c99f15004ee40d03599b26215845031eaceaf
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue May 19 10:04:49 2026 +0530

    [v3-2-test] fix: migrate existing deadline rows in migration 0080 upgrade 
and downgrade (#66016) (#67129)
    
    [v3-2-test] fix: migrate existing deadline rows in migration 0080 upgrade 
and downgrade (#66016) (#67129)
    ---------
    (cherry picked from commit c8a6c55cac8031c4131b5b0e064c45cb232b9966)
    
    Co-authored-by: Rahul Vats <[email protected]>
---
 .../0080_3_1_0_modify_deadline_callback_schema.py  | 195 ++++++++++++++-
 ...0_replace_deadline_inline_callback_with_fkey.py |  31 ++-
 .../test_0080_deadline_callback_migration.py       | 268 +++++++++++++++++++++
 .../test_0094_deadline_callback_migration.py       | 178 ++++++++++++++
 4 files changed, 662 insertions(+), 10 deletions(-)

diff --git 
a/airflow-core/src/airflow/migrations/versions/0080_3_1_0_modify_deadline_callback_schema.py
 
b/airflow-core/src/airflow/migrations/versions/0080_3_1_0_modify_deadline_callback_schema.py
index adb3354512d..23b00e1a1eb 100644
--- 
a/airflow-core/src/airflow/migrations/versions/0080_3_1_0_modify_deadline_callback_schema.py
+++ 
b/airflow-core/src/airflow/migrations/versions/0080_3_1_0_modify_deadline_callback_schema.py
@@ -27,8 +27,13 @@ Create Date: 2025-07-31 19:35:53.150465
 
 from __future__ import annotations
 
+import json
+from textwrap import dedent
+
 import sqlalchemy as sa
-from alembic import op
+from alembic import context, op
+
+from airflow.configuration import conf
 
 # revision identifiers, used by Alembic.
 revision = "808787349f22"
@@ -38,17 +43,199 @@ depends_on = None
 airflow_version = "3.1.0"
 
 
+_ASYNC_CALLBACK_CLASSNAME = "airflow.sdk.definitions.deadline.AsyncCallback"
+# Maximum length of the callback VARCHAR column in the pre-0080 schema.
+_CALLBACK_MAX_LEN = 500
+
+
 def upgrade():
     """Replace deadline table's string callback and JSON callback_kwargs with 
JSON callback."""
+    if context.is_offline_mode():
+        print(
+            dedent("""
+            ------------
+            --  WARNING: Unable to migrate the data in the deadline table
+            --  while in offline mode!  All rows in the deadline table will
+            --  be deleted.
+            ------------
+            """)
+        )
+        op.execute("DELETE FROM deadline")
+        with op.batch_alter_table("deadline", schema=None) as batch_op:
+            batch_op.drop_column("callback")
+            batch_op.drop_column("callback_kwargs")
+            batch_op.add_column(sa.Column("callback", sa.JSON(), 
nullable=False))
+        return
+
+    conn = op.get_bind()
+    batch_size = conf.getint("database", "migration_batch_size", fallback=1000)
+
+    # Add the destination column alongside the existing ones so we can migrate
+    # in batches without loading the whole table into memory at once.
+    with op.batch_alter_table("deadline", schema=None) as batch_op:
+        batch_op.add_column(sa.Column("callback_new", sa.JSON(), 
nullable=True))
+
+    deadline_read = sa.table(
+        "deadline",
+        sa.column("id"),
+        sa.column("callback"),
+        sa.column("callback_kwargs", sa.JSON()),
+        sa.column("callback_new", sa.JSON()),
+    )
+    deadline_write = sa.table(
+        "deadline",
+        sa.column("id"),
+        sa.column("callback_new", sa.JSON()),
+    )
+
+    while True:
+        rows = conn.execute(
+            sa.select(
+                deadline_read.c.id,
+                deadline_read.c.callback,
+                deadline_read.c.callback_kwargs,
+            )
+            .where(deadline_read.c.callback_new.is_(None))
+            .limit(batch_size)
+        ).fetchall()
+
+        if not rows:
+            break
+
+        batch = []
+        for row in rows:
+            path = row[1] or ""
+            kwargs = row[2]
+            if isinstance(kwargs, str):
+                kwargs = json.loads(kwargs) if kwargs else {}
+            if not isinstance(kwargs, dict):
+                kwargs = {}
+            batch.append(
+                {
+                    "row_id": row[0],
+                    "new_callback": {
+                        "__data__": {"path": path, "kwargs": kwargs},
+                        "__classname__": _ASYNC_CALLBACK_CLASSNAME,
+                        "__version__": 0,
+                    },
+                }
+            )
+
+        conn.execute(
+            sa.update(deadline_write)
+            .where(deadline_write.c.id == sa.bindparam("row_id"))
+            .values(callback_new=sa.bindparam("new_callback")),
+            batch,
+        )
+
+        if len(rows) < batch_size:
+            break
+
     with op.batch_alter_table("deadline", schema=None) as batch_op:
         batch_op.drop_column("callback")
         batch_op.drop_column("callback_kwargs")
-        batch_op.add_column(sa.Column("callback", sa.JSON(), nullable=False))
+        batch_op.alter_column(
+            "callback_new",
+            new_column_name="callback",
+            existing_type=sa.JSON(),
+            nullable=False,
+        )
 
 
 def downgrade():
     """Replace deadline table's JSON callback with string callback and JSON 
callback_kwargs."""
+    if context.is_offline_mode():
+        print(
+            dedent("""
+            ------------
+            --  WARNING: Unable to migrate the data in the deadline table
+            --  while in offline mode!  All rows in the deadline table will
+            --  be deleted.
+            ------------
+            """)
+        )
+        op.execute("DELETE FROM deadline")
+        with op.batch_alter_table("deadline", schema=None) as batch_op:
+            batch_op.drop_column("callback")
+            batch_op.add_column(sa.Column("callback_kwargs", sa.JSON(), 
nullable=True))
+            batch_op.add_column(sa.Column("callback", sa.String(length=500), 
nullable=False))
+        return
+
+    conn = op.get_bind()
+    batch_size = conf.getint("database", "migration_batch_size", fallback=1000)
+
+    # Add the restored columns alongside the existing JSON callback so we can
+    # back-fill in batches before dropping the JSON column.
     with op.batch_alter_table("deadline", schema=None) as batch_op:
-        batch_op.drop_column("callback")
+        batch_op.add_column(sa.Column("callback_old", sa.String(length=500), 
nullable=True))
         batch_op.add_column(sa.Column("callback_kwargs", sa.JSON(), 
nullable=True))
-        batch_op.add_column(sa.Column("callback", sa.String(length=500), 
nullable=False))
+
+    deadline_read = sa.table(
+        "deadline",
+        sa.column("id"),
+        sa.column("callback", sa.JSON()),
+        sa.column("callback_old", sa.String(500)),
+    )
+    deadline_write = sa.table(
+        "deadline",
+        sa.column("id"),
+        sa.column("callback_old", sa.String(500)),
+        sa.column("callback_kwargs", sa.JSON()),
+    )
+
+    while True:
+        rows = conn.execute(
+            sa.select(deadline_read.c.id, deadline_read.c.callback)
+            .where(deadline_read.c.callback_old.is_(None))
+            .limit(batch_size)
+        ).fetchall()
+
+        if not rows:
+            break
+
+        batch = []
+        for row in rows:
+            cb = row[1]
+            if cb is None:
+                path, kwargs = "", {}
+            else:
+                if isinstance(cb, str):
+                    cb = json.loads(cb)
+                cb_inner = cb.get("__data__", cb) if isinstance(cb, dict) else 
{}
+                path = cb_inner.get("path", "")
+                if len(path) > _CALLBACK_MAX_LEN:
+                    print(
+                        f"WARNING: callback path for deadline {row[0]} exceeds 
"
+                        f"{_CALLBACK_MAX_LEN} chars and will be truncated."
+                    )
+                    path = path[:_CALLBACK_MAX_LEN]
+                kwargs = cb_inner.get("kwargs", {})
+                if not isinstance(kwargs, dict):
+                    print(
+                        f"WARNING: kwargs for deadline {row[0]} is not a dict "
+                        f"(type={type(kwargs).__name__}); resetting to empty 
dict."
+                    )
+                    kwargs = {}
+            batch.append({"row_id": row[0], "old_callback": path, 
"old_kwargs": kwargs})
+
+        conn.execute(
+            sa.update(deadline_write)
+            .where(deadline_write.c.id == sa.bindparam("row_id"))
+            .values(
+                callback_old=sa.bindparam("old_callback"),
+                callback_kwargs=sa.bindparam("old_kwargs"),
+            ),
+            batch,
+        )
+
+        if len(rows) < batch_size:
+            break
+
+    with op.batch_alter_table("deadline", schema=None) as batch_op:
+        batch_op.drop_column("callback")
+        batch_op.alter_column(
+            "callback_old",
+            new_column_name="callback",
+            existing_type=sa.String(500),
+            nullable=False,
+        )
diff --git 
a/airflow-core/src/airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py
 
b/airflow-core/src/airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py
index 06d0f7b43ab..0cab3d57c25 100644
--- 
a/airflow-core/src/airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py
+++ 
b/airflow-core/src/airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py
@@ -69,8 +69,8 @@ def _upgrade_postgresql(conn, batch_size):
                         d.id AS deadline_id,
                         gen_random_uuid() AS callback_id,
                         COALESCE(dr.dag_id, '') AS dag_id,
-                        d.callback::jsonb->'__data__'->>'path' AS cb_path,
-                        d.callback::jsonb->'__data__'->'kwargs' AS cb_kwargs,
+                        COALESCE(d.callback::jsonb->'__data__'->>'path', '') 
AS cb_path,
+                        
COALESCE(NULLIF(d.callback::jsonb->'__data__'->'kwargs', 'null'::jsonb), 
'{}'::jsonb) AS cb_kwargs,
                         CASE
                             WHEN d.callback_state IN (:state_success, 
:state_failed) THEN d.callback_state
                             ELSE :state_pending
@@ -177,6 +177,7 @@ def _upgrade_mysql_sqlite(conn, batch_size):
     )
 
     batch_num = 0
+    null_callback_count = 0
     while True:
         batch_num += 1
         batch = conn.execute(
@@ -199,11 +200,23 @@ def _upgrade_mysql_sqlite(conn, batch_size):
 
         for row in batch:
             callback_id = uuid6.uuid7()
-            cb = row.callback if isinstance(row.callback, dict) else 
json.loads(row.callback)
-            cb_inner = cb.get("__data__", cb)
+            raw_cb = row.callback
+            if raw_cb is None:
+                null_callback_count += 1
+                cb = {}
+            elif isinstance(raw_cb, dict):
+                cb = raw_cb
+            else:
+                cb = json.loads(raw_cb)
+            cb_inner = cb.get("__data__", cb) if isinstance(cb, dict) else {}
+            if not isinstance(cb_inner, dict):
+                cb_inner = {}
+            kwargs = cb_inner.get("kwargs", {})
+            if not isinstance(kwargs, dict):
+                kwargs = {}
             cb_data = {
-                "path": cb_inner.get("path", ""),
-                "kwargs": cb_inner.get("kwargs", {}),
+                "path": cb_inner.get("path", "") or "",
+                "kwargs": kwargs,
                 "prefix": _CALLBACK_METRICS_PREFIX,
                 "dag_id": row.dag_id or "",
             }
@@ -237,6 +250,12 @@ def _upgrade_mysql_sqlite(conn, batch_size):
         )
         print(f"Migrated {len(batch)} deadline records in batch {batch_num}")
 
+    if null_callback_count:
+        print(
+            f"WARNING: {null_callback_count} deadline rows had NULL callback "
+            "(legacy 0080 data); migrated with empty envelope."
+        )
+
 
 def upgrade():
     """Replace Deadline table's inline callback fields with callback_id 
foreign key."""
diff --git 
a/airflow-core/tests/unit/migrations/test_0080_deadline_callback_migration.py 
b/airflow-core/tests/unit/migrations/test_0080_deadline_callback_migration.py
new file mode 100644
index 00000000000..c777d8418b1
--- /dev/null
+++ 
b/airflow-core/tests/unit/migrations/test_0080_deadline_callback_migration.py
@@ -0,0 +1,268 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Regression tests for migration 0080 (808787349f22):
+upgrade() and downgrade() must correctly migrate existing deadline rows
+without raising NotNullViolation.
+"""
+
+from __future__ import annotations
+
+import importlib.util
+import json
+import uuid
+from pathlib import Path
+from unittest import mock
+
+import sqlalchemy as sa
+from alembic.migration import MigrationContext
+from alembic.operations import Operations
+
+from tests_common.test_utils.paths import AIRFLOW_CORE_SOURCES_PATH
+
+# Migration filenames start with a digit so they cannot be imported via the
+# normal import system; load the module by file path instead.
+_MIGRATION_PATH = (
+    Path(AIRFLOW_CORE_SOURCES_PATH)
+    / 
"airflow/migrations/versions/0080_3_1_0_modify_deadline_callback_schema.py"
+)
+_spec = importlib.util.spec_from_file_location("migration_0080", 
_MIGRATION_PATH)
+_migration = importlib.util.module_from_spec(_spec)  # type: ignore[arg-type]
+_spec.loader.exec_module(_migration)  # type: ignore[union-attr]
+
+upgrade = _migration.upgrade
+downgrade = _migration.downgrade
+_ASYNC_CALLBACK_CLASSNAME = _migration._ASYNC_CALLBACK_CLASSNAME
+
+_PRE_0080_DDL = """
+CREATE TABLE deadline (
+    id          TEXT        PRIMARY KEY,
+    dagrun_id   INTEGER     NOT NULL,
+    deadline_time TEXT      NOT NULL,
+    callback    TEXT        NOT NULL,
+    callback_kwargs TEXT,
+    created_at  TEXT,
+    last_updated_at TEXT
+)
+"""
+
+_POST_0080_DDL = """
+CREATE TABLE deadline (
+    id          TEXT        PRIMARY KEY,
+    dagrun_id   INTEGER     NOT NULL,
+    deadline_time TEXT      NOT NULL,
+    callback    TEXT        NOT NULL,
+    created_at  TEXT,
+    last_updated_at TEXT
+)
+"""
+
+
+def _make_engine_pre_0080():
+    """Return an in-memory SQLite engine with the pre-0080 deadline schema."""
+    engine = sa.create_engine("sqlite:///:memory:")
+    with engine.connect() as conn:
+        conn.execute(sa.text(_PRE_0080_DDL))
+        conn.commit()
+    return engine
+
+
+def _make_engine_post_0080():
+    """Return an in-memory SQLite engine with the post-0080 deadline schema."""
+    engine = sa.create_engine("sqlite:///:memory:")
+    with engine.connect() as conn:
+        conn.execute(sa.text(_POST_0080_DDL))
+        conn.commit()
+    return engine
+
+
+def _run_upgrade(engine):
+    # alembic.context is a proxy that is only populated when running through
+    # Alembic's full migration runner (alembic upgrade).  When calling the
+    # migration function directly in a test we must mock it so that the
+    # is_offline_mode() guard does not raise AttributeError.
+    with engine.begin() as conn:
+        with Operations.context(MigrationContext.configure(conn)):
+            with mock.patch.object(_migration, "context") as mock_ctx:
+                mock_ctx.is_offline_mode.return_value = False
+                upgrade()
+
+
+def _run_downgrade(engine):
+    with engine.begin() as conn:
+        with Operations.context(MigrationContext.configure(conn)):
+            with mock.patch.object(_migration, "context") as mock_ctx:
+                mock_ctx.is_offline_mode.return_value = False
+                downgrade()
+
+
+def _read_deadline(engine):
+    with engine.connect() as conn:
+        return conn.execute(sa.text("SELECT * FROM deadline")).mappings().all()
+
+
+class TestMigration0080Upgrade:
+    def test_upgrade_empty_table(self):
+        """Upgrade on an empty table must not raise."""
+        engine = _make_engine_pre_0080()
+        _run_upgrade(engine)
+        rows = _read_deadline(engine)
+        assert rows == []
+
+    def test_upgrade_migrates_existing_row(self):
+        """Upgrade converts VARCHAR callback + JSON kwargs to the expected 
JSON envelope."""
+        engine = _make_engine_pre_0080()
+        row_id = str(uuid.uuid4())
+        with engine.begin() as conn:
+            conn.execute(
+                sa.text(
+                    "INSERT INTO deadline (id, dagrun_id, deadline_time, 
callback, callback_kwargs)"
+                    " VALUES (:id, 1, '2025-01-01', :cb, :kw)"
+                ),
+                {"id": row_id, "cb": "mymodule.my_callback", "kw": 
json.dumps({"key": "val"})},
+            )
+
+        _run_upgrade(engine)
+
+        rows = _read_deadline(engine)
+        assert len(rows) == 1
+        cb = rows[0]["callback"]
+        if isinstance(cb, str):
+            cb = json.loads(cb)
+        assert cb["__classname__"] == _ASYNC_CALLBACK_CLASSNAME
+        assert cb["__version__"] == 0
+        assert cb["__data__"]["path"] == "mymodule.my_callback"
+        assert cb["__data__"]["kwargs"] == {"key": "val"}
+        assert "callback_kwargs" not in rows[0]
+
+    def test_upgrade_null_kwargs_defaults_to_empty_dict(self):
+        """Upgrade with NULL callback_kwargs must produce an empty dict in the 
envelope."""
+        engine = _make_engine_pre_0080()
+        row_id = str(uuid.uuid4())
+        with engine.begin() as conn:
+            conn.execute(
+                sa.text(
+                    "INSERT INTO deadline (id, dagrun_id, deadline_time, 
callback, callback_kwargs)"
+                    " VALUES (:id, 1, '2025-01-01', :cb, NULL)"
+                ),
+                {"id": row_id, "cb": "mymodule.my_callback"},
+            )
+
+        _run_upgrade(engine)
+
+        rows = _read_deadline(engine)
+        cb = rows[0]["callback"]
+        if isinstance(cb, str):
+            cb = json.loads(cb)
+        assert cb["__data__"]["kwargs"] == {}
+
+    def test_upgrade_exact_batch_boundary(self, monkeypatch):
+        """Rows == batch_size must force a second iteration that returns 0 
rows and exits cleanly."""
+        # Force a small batch_size so 2 inserted rows == batch_size exactly.
+        monkeypatch.setattr(_migration.conf, "getint", lambda *a, **kw: 2)
+        engine = _make_engine_pre_0080()
+        with engine.begin() as conn:
+            for i in range(2):
+                conn.execute(
+                    sa.text(
+                        "INSERT INTO deadline (id, dagrun_id, deadline_time, 
callback, callback_kwargs)"
+                        " VALUES (:id, 1, '2025-01-01', :cb, :kw)"
+                    ),
+                    {"id": str(uuid.uuid4()), "cb": f"mod.cb_{i}", "kw": 
json.dumps({"i": i})},
+                )
+
+        _run_upgrade(engine)
+
+        rows = _read_deadline(engine)
+        assert len(rows) == 2
+        paths = sorted(
+            (json.loads(r["callback"]) if isinstance(r["callback"], str) else 
r["callback"])["__data__"][
+                "path"
+            ]
+            for r in rows
+        )
+        assert paths == ["mod.cb_0", "mod.cb_1"]
+
+
+class TestMigration0080Downgrade:
+    def test_downgrade_empty_table(self):
+        """Downgrade on an empty table must not raise."""
+        engine = _make_engine_post_0080()
+        _run_downgrade(engine)
+        rows = _read_deadline(engine)
+        assert rows == []
+
+    def test_downgrade_restores_existing_row(self):
+        """Downgrade extracts path and kwargs back from the JSON envelope."""
+        engine = _make_engine_post_0080()
+        row_id = str(uuid.uuid4())
+        callback_json = json.dumps(
+            {
+                "__data__": {"path": "mymodule.my_callback", "kwargs": {"key": 
"val"}},
+                "__classname__": _ASYNC_CALLBACK_CLASSNAME,
+                "__version__": 0,
+            }
+        )
+        with engine.begin() as conn:
+            conn.execute(
+                sa.text(
+                    "INSERT INTO deadline (id, dagrun_id, deadline_time, 
callback)"
+                    " VALUES (:id, 1, '2025-01-01', :cb)"
+                ),
+                {"id": row_id, "cb": callback_json},
+            )
+
+        _run_downgrade(engine)
+
+        rows = _read_deadline(engine)
+        assert len(rows) == 1
+        assert rows[0]["callback"] == "mymodule.my_callback"
+        kw = rows[0]["callback_kwargs"]
+        if isinstance(kw, str):
+            kw = json.loads(kw)
+        assert kw == {"key": "val"}
+
+
+class TestMigration0080RoundTrip:
+    def test_round_trip_preserves_data(self):
+        """Upgrade followed by downgrade preserves the original callback 
path."""
+        engine = _make_engine_pre_0080()
+        row_id = str(uuid.uuid4())
+        original_path = "mymodule.my_callback"
+        original_kwargs = {"x": 1}
+
+        with engine.begin() as conn:
+            conn.execute(
+                sa.text(
+                    "INSERT INTO deadline (id, dagrun_id, deadline_time, 
callback, callback_kwargs)"
+                    " VALUES (:id, 1, '2025-01-01', :cb, :kw)"
+                ),
+                {"id": row_id, "cb": original_path, "kw": 
json.dumps(original_kwargs)},
+            )
+
+        _run_upgrade(engine)
+        _run_downgrade(engine)
+
+        rows = _read_deadline(engine)
+        assert len(rows) == 1
+        assert rows[0]["callback"] == original_path
+        kw = rows[0]["callback_kwargs"]
+        if isinstance(kw, str):
+            kw = json.loads(kw)
+        assert kw == original_kwargs
diff --git 
a/airflow-core/tests/unit/migrations/test_0094_deadline_callback_migration.py 
b/airflow-core/tests/unit/migrations/test_0094_deadline_callback_migration.py
new file mode 100644
index 00000000000..3d848bf843a
--- /dev/null
+++ 
b/airflow-core/tests/unit/migrations/test_0094_deadline_callback_migration.py
@@ -0,0 +1,178 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Regression tests for migration 0094 (e812941398f4).
+
+These tests focus on the defensive NULL-callback path: legacy MySQL
+deployments that ran the original (buggy) 0080 left ``deadline.callback``
+rows as NULL. 0094 must heal those rows instead of crashing on
+``json.loads(None)``.
+"""
+
+from __future__ import annotations
+
+import importlib.util
+import json
+import uuid
+from pathlib import Path
+
+import sqlalchemy as sa
+
+from tests_common.test_utils.paths import AIRFLOW_CORE_SOURCES_PATH
+
+_MIGRATION_PATH = (
+    Path(AIRFLOW_CORE_SOURCES_PATH)
+    / 
"airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py"
+)
+_spec = importlib.util.spec_from_file_location("migration_0094", 
_MIGRATION_PATH)
+_migration = importlib.util.module_from_spec(_spec)  # type: ignore[arg-type]
+_spec.loader.exec_module(_migration)  # type: ignore[union-attr]
+
+
+# Minimal post-0080 / pre-0094 schema. 0094 adds ``missed`` and ``callback_id``
+# itself before invoking ``_upgrade_mysql_sqlite``; we mimic that here so we
+# can call the inner helper directly without driving the full alembic chain.
+_POST_0080_DDL = [
+    """
+    CREATE TABLE dag_run (
+        id      INTEGER PRIMARY KEY,
+        dag_id  TEXT NOT NULL
+    )
+    """,
+    """
+    CREATE TABLE deadline (
+        id              TEXT    PRIMARY KEY,
+        dagrun_id       INTEGER NOT NULL,
+        deadline_time   TEXT    NOT NULL,
+        callback        TEXT,
+        callback_state  TEXT,
+        trigger_id      INTEGER,
+        callback_id     TEXT,
+        missed          BOOLEAN
+    )
+    """,
+    """
+    CREATE TABLE callback (
+        id              TEXT    PRIMARY KEY,
+        type            TEXT    NOT NULL,
+        fetch_method    TEXT    NOT NULL,
+        data            TEXT    NOT NULL,
+        state           TEXT    NOT NULL,
+        priority_weight INTEGER NOT NULL,
+        created_at      TEXT    NOT NULL
+    )
+    """,
+]
+
+
+def _make_engine():
+    engine = sa.create_engine("sqlite:///:memory:")
+    with engine.connect() as conn:
+        for ddl in _POST_0080_DDL:
+            conn.execute(sa.text(ddl))
+        conn.commit()
+    return engine
+
+
+def _insert_dagrun(conn, dagrun_id: int = 1, dag_id: str = "test_dag"):
+    conn.execute(
+        sa.text("INSERT INTO dag_run (id, dag_id) VALUES (:id, :dag_id)"),
+        {"id": dagrun_id, "dag_id": dag_id},
+    )
+
+
+def _insert_deadline(conn, deadline_id: str, callback, callback_state: str | 
None = None):
+    conn.execute(
+        sa.text(
+            "INSERT INTO deadline (id, dagrun_id, deadline_time, callback, 
callback_state)"
+            " VALUES (:id, 1, '2025-01-01', :cb, :state)"
+        ),
+        {"id": deadline_id, "cb": callback, "state": callback_state},
+    )
+
+
+class TestMigration0094NullCallbackRepair:
+    """A NULL callback row from a buggy 0080 must not crash 0094's upgrade."""
+
+    def test_null_callback_does_not_crash(self):
+        engine = _make_engine()
+        # `_upgrade_mysql_sqlite` declares ``id`` as ``sa.Uuid()``; on SQLite 
the
+        # write path emits the hex (no-dash) form. Insert IDs in that same 
form so
+        # the UPDATE in the migration loop matches the row we created.
+        deadline_id = uuid.uuid4().hex
+        with engine.begin() as conn:
+            _insert_dagrun(conn)
+            _insert_deadline(conn, deadline_id, callback=None)
+
+        # _upgrade_mysql_sqlite reads from `deadline` and writes to `callback`;
+        # it does not depend on alembic's batch_alter_table prelude.
+        with engine.begin() as conn:
+            _migration._upgrade_mysql_sqlite(conn, batch_size=10)
+
+        with engine.connect() as conn:
+            deadline_rows = conn.execute(sa.text("SELECT * FROM 
deadline")).mappings().all()
+            callback_rows = conn.execute(sa.text("SELECT * FROM 
callback")).mappings().all()
+
+        assert len(deadline_rows) == 1
+        assert len(callback_rows) == 1
+        assert deadline_rows[0]["callback_id"] == callback_rows[0]["id"]
+        assert deadline_rows[0]["missed"] == 0  # SQLite: False -> 0
+
+        cb_data = json.loads(callback_rows[0]["data"])
+        assert cb_data["path"] == ""
+        assert cb_data["kwargs"] == {}
+        assert cb_data["dag_id"] == "test_dag"
+
+    def test_mixed_null_and_valid_callbacks(self):
+        """A batch with both NULL and well-formed rows must migrate both."""
+        engine = _make_engine()
+        null_id = uuid.uuid4().hex
+        valid_id = uuid.uuid4().hex
+        valid_callback = json.dumps(
+            {
+                "__data__": {"path": "mymodule.cb", "kwargs": {"k": "v"}},
+                "__classname__": 
"airflow.sdk.definitions.deadline.AsyncCallback",
+                "__version__": 0,
+            }
+        )
+        with engine.begin() as conn:
+            _insert_dagrun(conn)
+            _insert_deadline(conn, null_id, callback=None)
+            _insert_deadline(conn, valid_id, callback=valid_callback)
+
+        with engine.begin() as conn:
+            _migration._upgrade_mysql_sqlite(conn, batch_size=10)
+
+        with engine.connect() as conn:
+            rows = (
+                conn.execute(
+                    sa.text(
+                        "SELECT d.id AS deadline_id, c.data"
+                        " FROM deadline d JOIN callback c ON d.callback_id = 
c.id"
+                    )
+                )
+                .mappings()
+                .all()
+            )
+
+        by_id = {r["deadline_id"]: json.loads(r["data"]) for r in rows}
+        assert by_id[null_id]["path"] == ""
+        assert by_id[null_id]["kwargs"] == {}
+        assert by_id[valid_id]["path"] == "mymodule.cb"
+        assert by_id[valid_id]["kwargs"] == {"k": "v"}

Reply via email to