This is an automated email from the ASF dual-hosted git repository.

vatsrahul1001 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c8a6c55cac8 fix: migrate existing deadline rows in migration 0080 
upgrade and downgrade (#66016)
c8a6c55cac8 is described below

commit c8a6c55cac8031c4131b5b0e064c45cb232b9966
Author: Rahul Vats <[email protected]>
AuthorDate: Tue May 19 00:28:14 2026 +0530

    fix: migrate existing deadline rows in migration 0080 upgrade and downgrade 
(#66016)
    
    * fix: migrate existing deadline rows in migration 0080 upgrade and 
downgrade
    
    Both the upgrade and downgrade paths of migration 0080
    (808787349f22 - Modify deadline callback schema) added NOT NULL columns
    to the deadline table without first populating them from the existing
    data, causing:
    
    * upgrade:   NotNullViolation when adding callback JSON NOT NULL to a
                 non-empty table (existing rows have no value for the new
                 column).
    * downgrade: NotNullViolation on PostgreSQL / silent NULL on MySQL when
                 adding callback VARCHAR(500) NOT NULL after dropping the
                 JSON column, crashing the subsequent 0094 upgrade with
                 json.loads(None).
    
    Fix both paths with the same pattern used by migration 0094:
    
    1. Read the existing data before any schema change.
    2. Add the new column(s) as nullable so the DDL succeeds on all
       supported databases.
    3. Back-fill the column using a typed SA table clause (so each dialect
       handles JSON serialisation correctly).
    4. Enforce NOT NULL only after every row has a valid value.
    
    Upgrade serialises the old (path, kwargs) pair into the
    {"__data__": {"path": ..., "kwargs": ...}, "__classname__": ...,
     "__version__": 0} format expected by migration 0094.
    
    Downgrade extracts path/kwargs back from that same JSON envelope
    before restoring the VARCHAR callback column.
    
    Made-with: Cursor
    Co-authored-by: Cursor <[email protected]>
    
    * fix(deadline): repair NULL callback rows in 0094 upgrade
    
    Legacy MySQL deployments that ran the original (pre-#66016) 0080
    migration silently wrote NULL into deadline.callback. Alembic is already
    stamped past 808787349f22 on those deployments, so the fixed 0080 won't
    re-run -- 0094 then crashes on json.loads(None).
    
    Defensive handling in 0094:
    - _upgrade_mysql_sqlite: detect raw_cb is None, log warning, default to
      empty envelope.
    - _upgrade_postgresql: COALESCE on cb_path / cb_kwargs so NULL jsonb
      doesn't propagate into callback.data.
    
    Regression test starts from a post-0080 schema with callback=NULL and
    verifies 0094 produces an empty envelope without crashing, plus that a
    mixed NULL+valid batch migrates both rows correctly.
    
    Addresses ephraimbuddy's review comment on #66016.
    
    * fix(deadline): address review nits
    
    - 0094 PG path: wrap kwargs COALESCE with NULLIF so JSON-literal null
      (e.g. from hand-edited DBs) is also normalized to {} -- matches the
      MySQL/SQLite defensive normalization.
    - 0094 MySQL/SQLite: aggregate per-row NULL-callback WARNING into a
      single summary line after the loop to avoid log spam on deployments
      with many legacy NULL rows.
    - 0080 downgrade: log a WARNING when an existing row's kwargs is not a
      dict (instead of silently resetting), mirroring the VARCHAR truncate
      warning pattern.
    - 0080 tests: add test_upgrade_exact_batch_boundary covering the case
      where rows == batch_size to exercise the loop-continuation path.
    
    * test(deadline): insert deadline ids in hex form to avoid SA Uuid type 
mismatch
    
    `_upgrade_mysql_sqlite` declares the deadline `id` column as `sa.Uuid()`.
    On SQLite the SQLAlchemy write path serializes UUID objects as a hex string
    without dashes. The previous test inserted ids via `str(uuid.uuid4())`
    (dashed form), so the SELECT correctly parsed them back to UUID objects
    but the subsequent UPDATE's WHERE clause produced the hex form -- no rows
    matched, the WHERE callback_id IS NULL filter never narrowed, and the
    loop spun until the 60s execution timeout.
    
    Insert ids via `uuid.uuid4().hex` so read and write round-trip cleanly.
    
    * fix(deadline): drop redundant empty-rows break in 0080 batch loops
    
    * fix(deadline): restore empty-rows break in 0080 batch loops
    
    Without this guard, an empty deadline table causes executemany with an
    empty parameter list, which SQLAlchemy rejects with 'A value is required
    for bind parameter'. Reverts the removal from e50f106f.
    
    ---------
    
    Co-authored-by: Cursor <[email protected]>
---
 .../0080_3_1_0_modify_deadline_callback_schema.py  | 195 ++++++++++++++-
 ...0_replace_deadline_inline_callback_with_fkey.py |  31 ++-
 .../test_0080_deadline_callback_migration.py       | 268 +++++++++++++++++++++
 .../test_0094_deadline_callback_migration.py       | 178 ++++++++++++++
 4 files changed, 662 insertions(+), 10 deletions(-)

diff --git 
a/airflow-core/src/airflow/migrations/versions/0080_3_1_0_modify_deadline_callback_schema.py
 
b/airflow-core/src/airflow/migrations/versions/0080_3_1_0_modify_deadline_callback_schema.py
index adb3354512d..23b00e1a1eb 100644
--- 
a/airflow-core/src/airflow/migrations/versions/0080_3_1_0_modify_deadline_callback_schema.py
+++ 
b/airflow-core/src/airflow/migrations/versions/0080_3_1_0_modify_deadline_callback_schema.py
@@ -27,8 +27,13 @@ Create Date: 2025-07-31 19:35:53.150465
 
 from __future__ import annotations
 
+import json
+from textwrap import dedent
+
 import sqlalchemy as sa
-from alembic import op
+from alembic import context, op
+
+from airflow.configuration import conf
 
 # revision identifiers, used by Alembic.
 revision = "808787349f22"
@@ -38,17 +43,199 @@ depends_on = None
 airflow_version = "3.1.0"
 
 
+_ASYNC_CALLBACK_CLASSNAME = "airflow.sdk.definitions.deadline.AsyncCallback"
+# Maximum length of the callback VARCHAR column in the pre-0080 schema.
+_CALLBACK_MAX_LEN = 500
+
+
 def upgrade():
     """Replace deadline table's string callback and JSON callback_kwargs with 
JSON callback."""
+    if context.is_offline_mode():
+        print(
+            dedent("""
+            ------------
+            --  WARNING: Unable to migrate the data in the deadline table
+            --  while in offline mode!  All rows in the deadline table will
+            --  be deleted.
+            ------------
+            """)
+        )
+        op.execute("DELETE FROM deadline")
+        with op.batch_alter_table("deadline", schema=None) as batch_op:
+            batch_op.drop_column("callback")
+            batch_op.drop_column("callback_kwargs")
+            batch_op.add_column(sa.Column("callback", sa.JSON(), 
nullable=False))
+        return
+
+    conn = op.get_bind()
+    batch_size = conf.getint("database", "migration_batch_size", fallback=1000)
+
+    # Add the destination column alongside the existing ones so we can migrate
+    # in batches without loading the whole table into memory at once.
+    with op.batch_alter_table("deadline", schema=None) as batch_op:
+        batch_op.add_column(sa.Column("callback_new", sa.JSON(), 
nullable=True))
+
+    deadline_read = sa.table(
+        "deadline",
+        sa.column("id"),
+        sa.column("callback"),
+        sa.column("callback_kwargs", sa.JSON()),
+        sa.column("callback_new", sa.JSON()),
+    )
+    deadline_write = sa.table(
+        "deadline",
+        sa.column("id"),
+        sa.column("callback_new", sa.JSON()),
+    )
+
+    while True:
+        rows = conn.execute(
+            sa.select(
+                deadline_read.c.id,
+                deadline_read.c.callback,
+                deadline_read.c.callback_kwargs,
+            )
+            .where(deadline_read.c.callback_new.is_(None))
+            .limit(batch_size)
+        ).fetchall()
+
+        if not rows:
+            break
+
+        batch = []
+        for row in rows:
+            path = row[1] or ""
+            kwargs = row[2]
+            if isinstance(kwargs, str):
+                kwargs = json.loads(kwargs) if kwargs else {}
+            if not isinstance(kwargs, dict):
+                kwargs = {}
+            batch.append(
+                {
+                    "row_id": row[0],
+                    "new_callback": {
+                        "__data__": {"path": path, "kwargs": kwargs},
+                        "__classname__": _ASYNC_CALLBACK_CLASSNAME,
+                        "__version__": 0,
+                    },
+                }
+            )
+
+        conn.execute(
+            sa.update(deadline_write)
+            .where(deadline_write.c.id == sa.bindparam("row_id"))
+            .values(callback_new=sa.bindparam("new_callback")),
+            batch,
+        )
+
+        if len(rows) < batch_size:
+            break
+
     with op.batch_alter_table("deadline", schema=None) as batch_op:
         batch_op.drop_column("callback")
         batch_op.drop_column("callback_kwargs")
-        batch_op.add_column(sa.Column("callback", sa.JSON(), nullable=False))
+        batch_op.alter_column(
+            "callback_new",
+            new_column_name="callback",
+            existing_type=sa.JSON(),
+            nullable=False,
+        )
 
 
 def downgrade():
     """Replace deadline table's JSON callback with string callback and JSON 
callback_kwargs."""
+    if context.is_offline_mode():
+        print(
+            dedent("""
+            ------------
+            --  WARNING: Unable to migrate the data in the deadline table
+            --  while in offline mode!  All rows in the deadline table will
+            --  be deleted.
+            ------------
+            """)
+        )
+        op.execute("DELETE FROM deadline")
+        with op.batch_alter_table("deadline", schema=None) as batch_op:
+            batch_op.drop_column("callback")
+            batch_op.add_column(sa.Column("callback_kwargs", sa.JSON(), 
nullable=True))
+            batch_op.add_column(sa.Column("callback", sa.String(length=500), 
nullable=False))
+        return
+
+    conn = op.get_bind()
+    batch_size = conf.getint("database", "migration_batch_size", fallback=1000)
+
+    # Add the restored columns alongside the existing JSON callback so we can
+    # back-fill in batches before dropping the JSON column.
     with op.batch_alter_table("deadline", schema=None) as batch_op:
-        batch_op.drop_column("callback")
+        batch_op.add_column(sa.Column("callback_old", sa.String(length=500), 
nullable=True))
         batch_op.add_column(sa.Column("callback_kwargs", sa.JSON(), 
nullable=True))
-        batch_op.add_column(sa.Column("callback", sa.String(length=500), 
nullable=False))
+
+    deadline_read = sa.table(
+        "deadline",
+        sa.column("id"),
+        sa.column("callback", sa.JSON()),
+        sa.column("callback_old", sa.String(500)),
+    )
+    deadline_write = sa.table(
+        "deadline",
+        sa.column("id"),
+        sa.column("callback_old", sa.String(500)),
+        sa.column("callback_kwargs", sa.JSON()),
+    )
+
+    while True:
+        rows = conn.execute(
+            sa.select(deadline_read.c.id, deadline_read.c.callback)
+            .where(deadline_read.c.callback_old.is_(None))
+            .limit(batch_size)
+        ).fetchall()
+
+        if not rows:
+            break
+
+        batch = []
+        for row in rows:
+            cb = row[1]
+            if cb is None:
+                path, kwargs = "", {}
+            else:
+                if isinstance(cb, str):
+                    cb = json.loads(cb)
+                cb_inner = cb.get("__data__", cb) if isinstance(cb, dict) else 
{}
+                path = cb_inner.get("path", "")
+                if len(path) > _CALLBACK_MAX_LEN:
+                    print(
+                        f"WARNING: callback path for deadline {row[0]} exceeds 
"
+                        f"{_CALLBACK_MAX_LEN} chars and will be truncated."
+                    )
+                    path = path[:_CALLBACK_MAX_LEN]
+                kwargs = cb_inner.get("kwargs", {})
+                if not isinstance(kwargs, dict):
+                    print(
+                        f"WARNING: kwargs for deadline {row[0]} is not a dict "
+                        f"(type={type(kwargs).__name__}); resetting to empty 
dict."
+                    )
+                    kwargs = {}
+            batch.append({"row_id": row[0], "old_callback": path, 
"old_kwargs": kwargs})
+
+        conn.execute(
+            sa.update(deadline_write)
+            .where(deadline_write.c.id == sa.bindparam("row_id"))
+            .values(
+                callback_old=sa.bindparam("old_callback"),
+                callback_kwargs=sa.bindparam("old_kwargs"),
+            ),
+            batch,
+        )
+
+        if len(rows) < batch_size:
+            break
+
+    with op.batch_alter_table("deadline", schema=None) as batch_op:
+        batch_op.drop_column("callback")
+        batch_op.alter_column(
+            "callback_old",
+            new_column_name="callback",
+            existing_type=sa.String(500),
+            nullable=False,
+        )
diff --git 
a/airflow-core/src/airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py
 
b/airflow-core/src/airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py
index 06d0f7b43ab..0cab3d57c25 100644
--- 
a/airflow-core/src/airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py
+++ 
b/airflow-core/src/airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py
@@ -69,8 +69,8 @@ def _upgrade_postgresql(conn, batch_size):
                         d.id AS deadline_id,
                         gen_random_uuid() AS callback_id,
                         COALESCE(dr.dag_id, '') AS dag_id,
-                        d.callback::jsonb->'__data__'->>'path' AS cb_path,
-                        d.callback::jsonb->'__data__'->'kwargs' AS cb_kwargs,
+                        COALESCE(d.callback::jsonb->'__data__'->>'path', '') 
AS cb_path,
+                        
COALESCE(NULLIF(d.callback::jsonb->'__data__'->'kwargs', 'null'::jsonb), 
'{}'::jsonb) AS cb_kwargs,
                         CASE
                             WHEN d.callback_state IN (:state_success, 
:state_failed) THEN d.callback_state
                             ELSE :state_pending
@@ -177,6 +177,7 @@ def _upgrade_mysql_sqlite(conn, batch_size):
     )
 
     batch_num = 0
+    null_callback_count = 0
     while True:
         batch_num += 1
         batch = conn.execute(
@@ -199,11 +200,23 @@ def _upgrade_mysql_sqlite(conn, batch_size):
 
         for row in batch:
             callback_id = uuid6.uuid7()
-            cb = row.callback if isinstance(row.callback, dict) else 
json.loads(row.callback)
-            cb_inner = cb.get("__data__", cb)
+            raw_cb = row.callback
+            if raw_cb is None:
+                null_callback_count += 1
+                cb = {}
+            elif isinstance(raw_cb, dict):
+                cb = raw_cb
+            else:
+                cb = json.loads(raw_cb)
+            cb_inner = cb.get("__data__", cb) if isinstance(cb, dict) else {}
+            if not isinstance(cb_inner, dict):
+                cb_inner = {}
+            kwargs = cb_inner.get("kwargs", {})
+            if not isinstance(kwargs, dict):
+                kwargs = {}
             cb_data = {
-                "path": cb_inner.get("path", ""),
-                "kwargs": cb_inner.get("kwargs", {}),
+                "path": cb_inner.get("path", "") or "",
+                "kwargs": kwargs,
                 "prefix": _CALLBACK_METRICS_PREFIX,
                 "dag_id": row.dag_id or "",
             }
@@ -237,6 +250,12 @@ def _upgrade_mysql_sqlite(conn, batch_size):
         )
         print(f"Migrated {len(batch)} deadline records in batch {batch_num}")
 
+    if null_callback_count:
+        print(
+            f"WARNING: {null_callback_count} deadline rows had NULL callback "
+            "(legacy 0080 data); migrated with empty envelope."
+        )
+
 
 def upgrade():
     """Replace Deadline table's inline callback fields with callback_id 
foreign key."""
diff --git 
a/airflow-core/tests/unit/migrations/test_0080_deadline_callback_migration.py 
b/airflow-core/tests/unit/migrations/test_0080_deadline_callback_migration.py
new file mode 100644
index 00000000000..c777d8418b1
--- /dev/null
+++ 
b/airflow-core/tests/unit/migrations/test_0080_deadline_callback_migration.py
@@ -0,0 +1,268 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Regression tests for migration 0080 (808787349f22):
+upgrade() and downgrade() must correctly migrate existing deadline rows
+without raising NotNullViolation.
+"""
+
+from __future__ import annotations
+
+import importlib.util
+import json
+import uuid
+from pathlib import Path
+from unittest import mock
+
+import sqlalchemy as sa
+from alembic.migration import MigrationContext
+from alembic.operations import Operations
+
+from tests_common.test_utils.paths import AIRFLOW_CORE_SOURCES_PATH
+
+# Migration filenames start with a digit so they cannot be imported via the
+# normal import system; load the module by file path instead.
+_MIGRATION_PATH = (
+    Path(AIRFLOW_CORE_SOURCES_PATH)
+    / 
"airflow/migrations/versions/0080_3_1_0_modify_deadline_callback_schema.py"
+)
+_spec = importlib.util.spec_from_file_location("migration_0080", 
_MIGRATION_PATH)
+_migration = importlib.util.module_from_spec(_spec)  # type: ignore[arg-type]
+_spec.loader.exec_module(_migration)  # type: ignore[union-attr]
+
+upgrade = _migration.upgrade
+downgrade = _migration.downgrade
+_ASYNC_CALLBACK_CLASSNAME = _migration._ASYNC_CALLBACK_CLASSNAME
+
+_PRE_0080_DDL = """
+CREATE TABLE deadline (
+    id          TEXT        PRIMARY KEY,
+    dagrun_id   INTEGER     NOT NULL,
+    deadline_time TEXT      NOT NULL,
+    callback    TEXT        NOT NULL,
+    callback_kwargs TEXT,
+    created_at  TEXT,
+    last_updated_at TEXT
+)
+"""
+
+_POST_0080_DDL = """
+CREATE TABLE deadline (
+    id          TEXT        PRIMARY KEY,
+    dagrun_id   INTEGER     NOT NULL,
+    deadline_time TEXT      NOT NULL,
+    callback    TEXT        NOT NULL,
+    created_at  TEXT,
+    last_updated_at TEXT
+)
+"""
+
+
+def _make_engine_pre_0080():
+    """Return an in-memory SQLite engine with the pre-0080 deadline schema."""
+    engine = sa.create_engine("sqlite:///:memory:")
+    with engine.connect() as conn:
+        conn.execute(sa.text(_PRE_0080_DDL))
+        conn.commit()
+    return engine
+
+
+def _make_engine_post_0080():
+    """Return an in-memory SQLite engine with the post-0080 deadline schema."""
+    engine = sa.create_engine("sqlite:///:memory:")
+    with engine.connect() as conn:
+        conn.execute(sa.text(_POST_0080_DDL))
+        conn.commit()
+    return engine
+
+
+def _run_upgrade(engine):
+    # alembic.context is a proxy that is only populated when running through
+    # Alembic's full migration runner (alembic upgrade).  When calling the
+    # migration function directly in a test we must mock it so that the
+    # is_offline_mode() guard does not raise AttributeError.
+    with engine.begin() as conn:
+        with Operations.context(MigrationContext.configure(conn)):
+            with mock.patch.object(_migration, "context") as mock_ctx:
+                mock_ctx.is_offline_mode.return_value = False
+                upgrade()
+
+
+def _run_downgrade(engine):
+    with engine.begin() as conn:
+        with Operations.context(MigrationContext.configure(conn)):
+            with mock.patch.object(_migration, "context") as mock_ctx:
+                mock_ctx.is_offline_mode.return_value = False
+                downgrade()
+
+
+def _read_deadline(engine):
+    with engine.connect() as conn:
+        return conn.execute(sa.text("SELECT * FROM deadline")).mappings().all()
+
+
+class TestMigration0080Upgrade:
+    def test_upgrade_empty_table(self):
+        """Upgrade on an empty table must not raise."""
+        engine = _make_engine_pre_0080()
+        _run_upgrade(engine)
+        rows = _read_deadline(engine)
+        assert rows == []
+
+    def test_upgrade_migrates_existing_row(self):
+        """Upgrade converts VARCHAR callback + JSON kwargs to the expected 
JSON envelope."""
+        engine = _make_engine_pre_0080()
+        row_id = str(uuid.uuid4())
+        with engine.begin() as conn:
+            conn.execute(
+                sa.text(
+                    "INSERT INTO deadline (id, dagrun_id, deadline_time, 
callback, callback_kwargs)"
+                    " VALUES (:id, 1, '2025-01-01', :cb, :kw)"
+                ),
+                {"id": row_id, "cb": "mymodule.my_callback", "kw": 
json.dumps({"key": "val"})},
+            )
+
+        _run_upgrade(engine)
+
+        rows = _read_deadline(engine)
+        assert len(rows) == 1
+        cb = rows[0]["callback"]
+        if isinstance(cb, str):
+            cb = json.loads(cb)
+        assert cb["__classname__"] == _ASYNC_CALLBACK_CLASSNAME
+        assert cb["__version__"] == 0
+        assert cb["__data__"]["path"] == "mymodule.my_callback"
+        assert cb["__data__"]["kwargs"] == {"key": "val"}
+        assert "callback_kwargs" not in rows[0]
+
+    def test_upgrade_null_kwargs_defaults_to_empty_dict(self):
+        """Upgrade with NULL callback_kwargs must produce an empty dict in the 
envelope."""
+        engine = _make_engine_pre_0080()
+        row_id = str(uuid.uuid4())
+        with engine.begin() as conn:
+            conn.execute(
+                sa.text(
+                    "INSERT INTO deadline (id, dagrun_id, deadline_time, 
callback, callback_kwargs)"
+                    " VALUES (:id, 1, '2025-01-01', :cb, NULL)"
+                ),
+                {"id": row_id, "cb": "mymodule.my_callback"},
+            )
+
+        _run_upgrade(engine)
+
+        rows = _read_deadline(engine)
+        cb = rows[0]["callback"]
+        if isinstance(cb, str):
+            cb = json.loads(cb)
+        assert cb["__data__"]["kwargs"] == {}
+
+    def test_upgrade_exact_batch_boundary(self, monkeypatch):
+        """Rows == batch_size must force a second iteration that returns 0 
rows and exits cleanly."""
+        # Force a small batch_size so 2 inserted rows == batch_size exactly.
+        monkeypatch.setattr(_migration.conf, "getint", lambda *a, **kw: 2)
+        engine = _make_engine_pre_0080()
+        with engine.begin() as conn:
+            for i in range(2):
+                conn.execute(
+                    sa.text(
+                        "INSERT INTO deadline (id, dagrun_id, deadline_time, 
callback, callback_kwargs)"
+                        " VALUES (:id, 1, '2025-01-01', :cb, :kw)"
+                    ),
+                    {"id": str(uuid.uuid4()), "cb": f"mod.cb_{i}", "kw": 
json.dumps({"i": i})},
+                )
+
+        _run_upgrade(engine)
+
+        rows = _read_deadline(engine)
+        assert len(rows) == 2
+        paths = sorted(
+            (json.loads(r["callback"]) if isinstance(r["callback"], str) else 
r["callback"])["__data__"][
+                "path"
+            ]
+            for r in rows
+        )
+        assert paths == ["mod.cb_0", "mod.cb_1"]
+
+
+class TestMigration0080Downgrade:
+    def test_downgrade_empty_table(self):
+        """Downgrade on an empty table must not raise."""
+        engine = _make_engine_post_0080()
+        _run_downgrade(engine)
+        rows = _read_deadline(engine)
+        assert rows == []
+
+    def test_downgrade_restores_existing_row(self):
+        """Downgrade extracts path and kwargs back from the JSON envelope."""
+        engine = _make_engine_post_0080()
+        row_id = str(uuid.uuid4())
+        callback_json = json.dumps(
+            {
+                "__data__": {"path": "mymodule.my_callback", "kwargs": {"key": 
"val"}},
+                "__classname__": _ASYNC_CALLBACK_CLASSNAME,
+                "__version__": 0,
+            }
+        )
+        with engine.begin() as conn:
+            conn.execute(
+                sa.text(
+                    "INSERT INTO deadline (id, dagrun_id, deadline_time, 
callback)"
+                    " VALUES (:id, 1, '2025-01-01', :cb)"
+                ),
+                {"id": row_id, "cb": callback_json},
+            )
+
+        _run_downgrade(engine)
+
+        rows = _read_deadline(engine)
+        assert len(rows) == 1
+        assert rows[0]["callback"] == "mymodule.my_callback"
+        kw = rows[0]["callback_kwargs"]
+        if isinstance(kw, str):
+            kw = json.loads(kw)
+        assert kw == {"key": "val"}
+
+
+class TestMigration0080RoundTrip:
+    def test_round_trip_preserves_data(self):
+        """Upgrade followed by downgrade preserves the original callback 
path."""
+        engine = _make_engine_pre_0080()
+        row_id = str(uuid.uuid4())
+        original_path = "mymodule.my_callback"
+        original_kwargs = {"x": 1}
+
+        with engine.begin() as conn:
+            conn.execute(
+                sa.text(
+                    "INSERT INTO deadline (id, dagrun_id, deadline_time, 
callback, callback_kwargs)"
+                    " VALUES (:id, 1, '2025-01-01', :cb, :kw)"
+                ),
+                {"id": row_id, "cb": original_path, "kw": 
json.dumps(original_kwargs)},
+            )
+
+        _run_upgrade(engine)
+        _run_downgrade(engine)
+
+        rows = _read_deadline(engine)
+        assert len(rows) == 1
+        assert rows[0]["callback"] == original_path
+        kw = rows[0]["callback_kwargs"]
+        if isinstance(kw, str):
+            kw = json.loads(kw)
+        assert kw == original_kwargs
diff --git 
a/airflow-core/tests/unit/migrations/test_0094_deadline_callback_migration.py 
b/airflow-core/tests/unit/migrations/test_0094_deadline_callback_migration.py
new file mode 100644
index 00000000000..3d848bf843a
--- /dev/null
+++ 
b/airflow-core/tests/unit/migrations/test_0094_deadline_callback_migration.py
@@ -0,0 +1,178 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Regression tests for migration 0094 (e812941398f4).
+
+These tests focus on the defensive NULL-callback path: legacy MySQL
+deployments that ran the original (buggy) 0080 left ``deadline.callback``
+rows as NULL. 0094 must heal those rows instead of crashing on
+``json.loads(None)``.
+"""
+
+from __future__ import annotations
+
+import importlib.util
+import json
+import uuid
+from pathlib import Path
+
+import sqlalchemy as sa
+
+from tests_common.test_utils.paths import AIRFLOW_CORE_SOURCES_PATH
+
+_MIGRATION_PATH = (
+    Path(AIRFLOW_CORE_SOURCES_PATH)
+    / 
"airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py"
+)
+_spec = importlib.util.spec_from_file_location("migration_0094", 
_MIGRATION_PATH)
+_migration = importlib.util.module_from_spec(_spec)  # type: ignore[arg-type]
+_spec.loader.exec_module(_migration)  # type: ignore[union-attr]
+
+
+# Minimal post-0080 / pre-0094 schema. 0094 adds ``missed`` and ``callback_id``
+# itself before invoking ``_upgrade_mysql_sqlite``; we mimic that here so we
+# can call the inner helper directly without driving the full alembic chain.
+_POST_0080_DDL = [
+    """
+    CREATE TABLE dag_run (
+        id      INTEGER PRIMARY KEY,
+        dag_id  TEXT NOT NULL
+    )
+    """,
+    """
+    CREATE TABLE deadline (
+        id              TEXT    PRIMARY KEY,
+        dagrun_id       INTEGER NOT NULL,
+        deadline_time   TEXT    NOT NULL,
+        callback        TEXT,
+        callback_state  TEXT,
+        trigger_id      INTEGER,
+        callback_id     TEXT,
+        missed          BOOLEAN
+    )
+    """,
+    """
+    CREATE TABLE callback (
+        id              TEXT    PRIMARY KEY,
+        type            TEXT    NOT NULL,
+        fetch_method    TEXT    NOT NULL,
+        data            TEXT    NOT NULL,
+        state           TEXT    NOT NULL,
+        priority_weight INTEGER NOT NULL,
+        created_at      TEXT    NOT NULL
+    )
+    """,
+]
+
+
+def _make_engine():
+    engine = sa.create_engine("sqlite:///:memory:")
+    with engine.connect() as conn:
+        for ddl in _POST_0080_DDL:
+            conn.execute(sa.text(ddl))
+        conn.commit()
+    return engine
+
+
+def _insert_dagrun(conn, dagrun_id: int = 1, dag_id: str = "test_dag"):
+    conn.execute(
+        sa.text("INSERT INTO dag_run (id, dag_id) VALUES (:id, :dag_id)"),
+        {"id": dagrun_id, "dag_id": dag_id},
+    )
+
+
+def _insert_deadline(conn, deadline_id: str, callback, callback_state: str | 
None = None):
+    conn.execute(
+        sa.text(
+            "INSERT INTO deadline (id, dagrun_id, deadline_time, callback, 
callback_state)"
+            " VALUES (:id, 1, '2025-01-01', :cb, :state)"
+        ),
+        {"id": deadline_id, "cb": callback, "state": callback_state},
+    )
+
+
+class TestMigration0094NullCallbackRepair:
+    """A NULL callback row from a buggy 0080 must not crash 0094's upgrade."""
+
+    def test_null_callback_does_not_crash(self):
+        engine = _make_engine()
+        # `_upgrade_mysql_sqlite` declares ``id`` as ``sa.Uuid()``; on SQLite 
the
+        # write path emits the hex (no-dash) form. Insert IDs in that same 
form so
+        # the UPDATE in the migration loop matches the row we created.
+        deadline_id = uuid.uuid4().hex
+        with engine.begin() as conn:
+            _insert_dagrun(conn)
+            _insert_deadline(conn, deadline_id, callback=None)
+
+        # _upgrade_mysql_sqlite reads from `deadline` and writes to `callback`;
+        # it does not depend on alembic's batch_alter_table prelude.
+        with engine.begin() as conn:
+            _migration._upgrade_mysql_sqlite(conn, batch_size=10)
+
+        with engine.connect() as conn:
+            deadline_rows = conn.execute(sa.text("SELECT * FROM 
deadline")).mappings().all()
+            callback_rows = conn.execute(sa.text("SELECT * FROM 
callback")).mappings().all()
+
+        assert len(deadline_rows) == 1
+        assert len(callback_rows) == 1
+        assert deadline_rows[0]["callback_id"] == callback_rows[0]["id"]
+        assert deadline_rows[0]["missed"] == 0  # SQLite: False -> 0
+
+        cb_data = json.loads(callback_rows[0]["data"])
+        assert cb_data["path"] == ""
+        assert cb_data["kwargs"] == {}
+        assert cb_data["dag_id"] == "test_dag"
+
+    def test_mixed_null_and_valid_callbacks(self):
+        """A batch with both NULL and well-formed rows must migrate both."""
+        engine = _make_engine()
+        null_id = uuid.uuid4().hex
+        valid_id = uuid.uuid4().hex
+        valid_callback = json.dumps(
+            {
+                "__data__": {"path": "mymodule.cb", "kwargs": {"k": "v"}},
+                "__classname__": 
"airflow.sdk.definitions.deadline.AsyncCallback",
+                "__version__": 0,
+            }
+        )
+        with engine.begin() as conn:
+            _insert_dagrun(conn)
+            _insert_deadline(conn, null_id, callback=None)
+            _insert_deadline(conn, valid_id, callback=valid_callback)
+
+        with engine.begin() as conn:
+            _migration._upgrade_mysql_sqlite(conn, batch_size=10)
+
+        with engine.connect() as conn:
+            rows = (
+                conn.execute(
+                    sa.text(
+                        "SELECT d.id AS deadline_id, c.data"
+                        " FROM deadline d JOIN callback c ON d.callback_id = 
c.id"
+                    )
+                )
+                .mappings()
+                .all()
+            )
+
+        by_id = {r["deadline_id"]: json.loads(r["data"]) for r in rows}
+        assert by_id[null_id]["path"] == ""
+        assert by_id[null_id]["kwargs"] == {}
+        assert by_id[valid_id]["path"] == "mymodule.cb"
+        assert by_id[valid_id]["kwargs"] == {"k": "v"}

Reply via email to