SameerMesiah97 commented on code in PR #64751: URL: https://github.com/apache/airflow/pull/64751#discussion_r3086234645
########## airflow-core/src/airflow/migrations/versions/0112_3_3_0_change_deadline_interval_to_json.py: ########## @@ -0,0 +1,248 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Change type of interval in Deadline Alerts table to JSON. + +Revision ID: 82f208dbbad5 +Revises: a4c2d171ae18 +Create Date: 2026-04-06 16:55:46.517409 + +""" + +from __future__ import annotations + +import sqlalchemy as sa +from alembic import context, op + +# revision identifiers, used by Alembic. +revision = "82f208dbbad5" +down_revision = "a4c2d171ae18" +branch_labels = None +depends_on = None +airflow_version = "3.3.0" + + +def upgrade(): + """Apply change deadline interval to JSON.""" + conn = op.get_bind() + dialect = conn.dialect.name + + if context.is_offline_mode(): + print( + """ + Manual conversion required: + + PostgreSQL: + UPDATE deadline_alert + SET interval = json_build_object( + '__classname__', 'datetime.timedelta', + '__version__', 2, + '__data__', (interval::text)::float + ) + WHERE jsonb_typeof(interval::jsonb) = 'number'; + + MySQL: + UPDATE deadline_alert + SET `interval` = JSON_OBJECT( + '__classname__', 'datetime.timedelta', + '__version__', 2, + '__data__', `interval` + ); + + SQLite: + UPDATE deadline_alert + SET interval = + '{"__classname__":"datetime.timedelta","__version__":' + || '2' || + ',"__data__":' || CAST(interval AS TEXT) || '}'; + """ + ) + return + + with op.batch_alter_table("deadline_alert") as batch_op: + if dialect == "postgresql": + batch_op.alter_column( + "interval", + existing_type=sa.FLOAT(), + type_=sa.JSON(), + postgresql_using="to_json(interval)", + existing_nullable=False, + ) + else: + batch_op.alter_column( + "interval", + existing_type=sa.FLOAT(), + type_=sa.JSON(), + existing_nullable=False, + ) + + if dialect == "postgresql": + op.execute(""" + UPDATE deadline_alert + SET interval = json_build_object( + '__classname__', 'datetime.timedelta', + '__version__', 2, + '__data__', (interval::text)::float + ) + WHERE jsonb_typeof(interval::jsonb) = 'number' + """) + + elif dialect == "mysql": + op.execute(""" + UPDATE deadline_alert + SET `interval` = JSON_OBJECT( + '__classname__', 'datetime.timedelta', + '__version__', 2, + '__data__', `interval` + ) + """) + + else: + op.execute(""" + UPDATE deadline_alert + SET interval = + '{"__classname__":"datetime.timedelta","__version__":' + || '2' || + ',"__data__":' || CAST(interval AS TEXT) || '}' + """) + + +def downgrade(): + """Revert deadline interval back to float.""" + conn = op.get_bind() + dialect = conn.dialect.name + + if context.is_offline_mode(): + print( + """ + Manual downgrade required: + + PostgreSQL: + UPDATE deadline_alert + SET interval = + CASE + WHEN interval::jsonb ? '__data__' + THEN to_json((interval->>'__data__')::double precision) + ELSE to_json((interval::text)::double precision) + END; + + MySQL: + UPDATE deadline_alert + SET `interval` = + CASE + WHEN JSON_EXTRACT(`interval`, '$.__data__') IS NOT NULL + THEN CAST(JSON_EXTRACT(`interval`, '$.__data__') AS DECIMAL(20,6)) + ELSE CAST(`interval` AS DECIMAL(20,6)) + END; + + SQLite: + UPDATE deadline_alert + SET interval = + CASE + WHEN json_extract(interval, '$.__data__') IS NOT NULL + THEN CAST(json_extract(interval, '$.__data__') AS REAL) + ELSE CAST(interval AS REAL) + END; + """ + ) + return + + if dialect == "postgresql": + op.execute(""" + UPDATE deadline_alert + SET interval = + CASE + WHEN interval::jsonb ? '__data__' + THEN to_json((interval->>'__data__')::double precision) + ELSE to_json((interval::text)::double precision) + END + """) Review Comment: Now, only seriallized `timedelta` objects will be converted to interval values. Please see below: ``` UPDATE deadline_alert SET interval = CASE WHEN jsonb_typeof(interval::jsonb) = 'number' THEN interval WHEN (interval::jsonb)->>'__classname__' = 'datetime.timedelta' THEN to_json((interval->>'__data__')::double precision) ELSE NULL END ``` Serialized `VariableInterval` objects will become NULL in the downgrade path. This will break `DeadlineAlerts` with interval set to `VariableInterval`, which is a deliberate tradeoff in this scenario. A guard has been added at the deserialization layer to handle situations where the metadata DB has been downgraded when serialized `VariableInterval` objects exist, resulting in the interval being deserialized as `None`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
