SameerMesiah97 commented on code in PR #64751:
URL: https://github.com/apache/airflow/pull/64751#discussion_r3086269863


##########
airflow-core/src/airflow/migrations/versions/0112_3_3_0_change_deadline_interval_to_json.py:
##########
@@ -0,0 +1,248 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Change type of interval in Deadline Alerts table to JSON.
+
+Revision ID: 82f208dbbad5
+Revises: a4c2d171ae18
+Create Date: 2026-04-06 16:55:46.517409
+
+"""
+
+from __future__ import annotations
+
+import sqlalchemy as sa
+from alembic import context, op
+
+# revision identifiers, used by Alembic.
+revision = "82f208dbbad5"
+down_revision = "a4c2d171ae18"
+branch_labels = None
+depends_on = None
+airflow_version = "3.3.0"
+
+
+def upgrade():
+    """Apply change deadline interval to JSON."""
+    conn = op.get_bind()
+    dialect = conn.dialect.name
+
+    if context.is_offline_mode():
+        print(
+            """
+            Manual conversion required:
+
+            PostgreSQL:
+            UPDATE deadline_alert
+            SET interval = json_build_object(
+                '__classname__', 'datetime.timedelta',
+                '__version__', 2,
+                '__data__', (interval::text)::float
+            )
+            WHERE jsonb_typeof(interval::jsonb) = 'number';
+
+            MySQL:
+            UPDATE deadline_alert
+            SET `interval` = JSON_OBJECT(
+                '__classname__', 'datetime.timedelta',
+                '__version__', 2,
+                '__data__', `interval`
+            );
+
+            SQLite:
+            UPDATE deadline_alert
+            SET interval =
+                '{"__classname__":"datetime.timedelta","__version__":'
+                || '2' ||
+                ',"__data__":' || CAST(interval AS TEXT) || '}';
+            """
+        )
+        return
+
+    with op.batch_alter_table("deadline_alert") as batch_op:
+        if dialect == "postgresql":
+            batch_op.alter_column(
+                "interval",
+                existing_type=sa.FLOAT(),
+                type_=sa.JSON(),
+                postgresql_using="to_json(interval)",
+                existing_nullable=False,
+            )
+        else:
+            batch_op.alter_column(
+                "interval",
+                existing_type=sa.FLOAT(),
+                type_=sa.JSON(),
+                existing_nullable=False,
+            )
+
+    if dialect == "postgresql":
+        op.execute("""
+            UPDATE deadline_alert
+            SET interval = json_build_object(
+                '__classname__', 'datetime.timedelta',
+                '__version__', 2,
+                '__data__', (interval::text)::float
+            )
+            WHERE jsonb_typeof(interval::jsonb) = 'number'
+        """)
+
+    elif dialect == "mysql":
+        op.execute("""
+            UPDATE deadline_alert
+            SET `interval` = JSON_OBJECT(
+                '__classname__', 'datetime.timedelta',
+                '__version__', 2,
+                '__data__', `interval`
+            )
+        """)
+
+    else:
+        op.execute("""
+            UPDATE deadline_alert
+            SET interval =
+                '{"__classname__":"datetime.timedelta","__version__":'
+                || '2' ||
+                ',"__data__":' || CAST(interval AS TEXT) || '}'
+            """)
+
+
+def downgrade():
+    """Revert deadline interval back to float."""
+    conn = op.get_bind()
+    dialect = conn.dialect.name
+
+    if context.is_offline_mode():
+        print(
+            """
+            Manual downgrade required:
+
+            PostgreSQL:
+            UPDATE deadline_alert
+            SET interval =
+                CASE
+                    WHEN interval::jsonb ? '__data__'
+                    THEN to_json((interval->>'__data__')::double precision)
+                    ELSE to_json((interval::text)::double precision)
+                END;
+
+            MySQL:
+            UPDATE deadline_alert
+            SET `interval` =
+                CASE
+                    WHEN JSON_EXTRACT(`interval`, '$.__data__') IS NOT NULL
+                    THEN CAST(JSON_EXTRACT(`interval`, '$.__data__') AS 
DECIMAL(20,6))
+                    ELSE CAST(`interval` AS DECIMAL(20,6))
+                END;
+
+            SQLite:
+            UPDATE deadline_alert
+            SET interval =
+                CASE
+                    WHEN json_extract(interval, '$.__data__') IS NOT NULL
+                    THEN CAST(json_extract(interval, '$.__data__') AS REAL)
+                    ELSE CAST(interval AS REAL)
+                END;
+            """
+        )
+        return
+
+    if dialect == "postgresql":
+        op.execute("""
+            UPDATE deadline_alert
+            SET interval =
+                CASE
+                    WHEN interval::jsonb ? '__data__'
+                    THEN to_json((interval->>'__data__')::double precision)
+                    ELSE to_json((interval::text)::double precision)
+                END
+        """)
+
+    elif dialect == "mysql":
+        op.execute("""
+            UPDATE deadline_alert
+            SET `interval` =
+                CASE
+                    WHEN JSON_EXTRACT(`interval`, '$.__data__') IS NOT NULL
+                    THEN CAST(JSON_EXTRACT(`interval`, '$.__data__') AS 
DECIMAL(20,6))
+                    ELSE CAST(`interval` AS DECIMAL(20,6))
+                END
+        """)
+
+    # Serialized VariableInterval objects do not contain a numeric "__data__" 
field
+    # and therefore cannot be converted back to a float representation.
+    # During downgrade, only timedelta-style serialized values are converted.
+    # Other serialized interval types (e.g. VariableInterval) are not 
explicitly
+    # handled here and will fall through to the generic casting branch, which 
may
+    # result in NULL or backend-specific casting behavior.
+    else:
+        # Detect availability of SQLite JSON functions (JSON1 extension).
+        # If available, use json_extract for robust parsing.
+        # Otherwise, fall back to string-based extraction.
+        json_functions_available = False
+        try:
+            conn.execute(sa.text("SELECT JSON_SET('{}', '$.test', 
'value')")).fetchone()
+            json_functions_available = True
+            print("SQLite JSON functions detected, using optimized SQL 
approach")
+        except Exception:
+            print("SQLite JSON functions not available, using Python fallback 
for JSON processing")
+
+        if json_functions_available:
+            op.execute("""
+                UPDATE deadline_alert
+                SET interval =
+                    CASE
+                        WHEN json_extract(interval, '$.__data__') IS NOT NULL
+                        THEN CAST(json_extract(interval, '$.__data__') AS REAL)
+                        ELSE CAST(interval AS REAL)
+                    END
+            """)
+        else:
+            op.execute("""
+                UPDATE deadline_alert
+                SET interval =
+                    CASE
+                        WHEN instr(interval, '__data__') > 0
+                        THEN CAST(
+                            substr(
+                                interval,
+                                instr(interval, '__data__') +
+                                instr(substr(interval, instr(interval, 
'__data__')), ':')
+                            ) AS FLOAT
+                        )
+                        ELSE CAST(interval AS FLOAT)
+                    END
+                """)
+
+    with op.batch_alter_table("deadline_alert") as batch_op:
+        if dialect == "postgresql":
+            batch_op.alter_column(
+                "interval",
+                existing_type=sa.JSON(),
+                type_=sa.FLOAT(),
+                postgresql_using="(interval->>'__data__')::double precision",
+                existing_nullable=False,
+            )

Review Comment:
   Similar to the above, the following logic has been applied here:
   
   ```
   postgresql_using="""
                   CASE
                       WHEN jsonb_typeof(interval::jsonb) = 'number'
                           THEN interval::text::double precision
                       WHEN (interval::jsonb)->>'__classname__' = 
'datetime.timedelta'
                           THEN (interval->>'__data__')::double precision
                       ELSE NULL
                   END
                   """
   ```
    
   Only serialized `timedelta` objects are being normalized to interval values 
now. Serialized `VariableInterval` objects default to `NULL`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to