This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new db132fb56f6 Update conf column in dag_run table type from bytes to 
JSON (#44533)
db132fb56f6 is described below

commit db132fb56f685f0aaa9f955e7e6600d0d54e3fe7
Author: vatsrahul1001 <[email protected]>
AuthorDate: Mon Jan 13 19:31:07 2025 +0530

    Update conf column in dag_run table type from bytes to JSON (#44533)
    
    * remove pickled data from dag run table
    
    * fix downgrade + add news fragement
    
    * remove archive table if exits after downgrade
    
    * removing archiving data
    
    * fixing static check
    
    * fixing static checks
    
    * simplying upgrade and downgrade as per review
    
    * fixing failures
    
    * removing setting conf to null
    
    * refactor approach to migrate values in conf
    
    * update offline warning
    
    * resolving conflicts
    
    * resolving conflicts
    
    * resolving conflicts
    
    * updating batch size
    
    * updaing conf type
    
    ---------
    
    Co-authored-by: Jed Cunningham 
<[email protected]>
---
 ..._3_0_0_remove_pickled_data_from_dagrun_table.py | 145 +++++++++++++++++++++
 airflow/models/dagrun.py                           |   5 +-
 airflow/utils/db.py                                |   2 +-
 docs/apache-airflow/img/airflow_erd.sha256         |   2 +-
 docs/apache-airflow/img/airflow_erd.svg            |   2 +-
 docs/apache-airflow/migrations-ref.rst             |   4 +-
 newsfragments/44533.significant.rst                |   5 +
 7 files changed, 159 insertions(+), 6 deletions(-)

diff --git 
a/airflow/migrations/versions/0055_3_0_0_remove_pickled_data_from_dagrun_table.py
 
b/airflow/migrations/versions/0055_3_0_0_remove_pickled_data_from_dagrun_table.py
new file mode 100644
index 00000000000..07d012ddf57
--- /dev/null
+++ 
b/airflow/migrations/versions/0055_3_0_0_remove_pickled_data_from_dagrun_table.py
@@ -0,0 +1,145 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+remove pickled data from dagrun table.
+
+Revision ID: e39a26ac59f6
+Revises: 38770795785f
+Create Date: 2024-12-01 08:33:15.425141
+
+"""
+
+from __future__ import annotations
+
+import json
+import pickle
+from textwrap import dedent
+
+import sqlalchemy as sa
+from alembic import context, op
+from sqlalchemy import text
+from sqlalchemy.dialects import postgresql
+
+# revision identifiers, used by Alembic.
+revision = "e39a26ac59f6"
+down_revision = "38770795785f"
+branch_labels = None
+depends_on = None
+airflow_version = "3.0.0"
+
+
+def upgrade():
+    """Apply remove pickled data from dagrun table."""
+    conn = op.get_bind()
+    conf_type = sa.JSON().with_variant(postgresql.JSONB, "postgresql")
+    op.add_column("dag_run", sa.Column("conf_json", conf_type, nullable=True))
+
+    if context.is_offline_mode():
+        print(
+            dedent("""
+            ------------
+            --  WARNING: Unable to migrate the data in the 'conf' column while 
in offline mode!
+            --  The 'conf' column will be set to NULL in offline mode.
+            --  Avoid using offline mode if you need to retain 'conf' values.
+            ------------
+            """)
+        )
+    else:
+        BATCH_SIZE = 100
+        offset = 0
+        while True:
+            rows = conn.execute(
+                text(
+                    f"SELECT id,conf FROM dag_run WHERE conf IS not NULL order 
by id LIMIT {BATCH_SIZE} OFFSET {offset}"
+                )
+            ).fetchall()
+            if not rows:
+                break
+            for row in rows:
+                row_id, pickle_data = row
+
+                try:
+                    original_data = pickle.loads(pickle_data)
+                    json_data = json.dumps(original_data)
+                    conn.execute(
+                        text("""
+                                                UPDATE dag_run
+                                                SET conf_json = :json_data
+                                                WHERE id = :id
+                                            """),
+                        {"json_data": json_data, "id": row_id},
+                    )
+                except Exception as e:
+                    print(f"Error converting dagrun conf to json for dagrun ID 
{row_id}: {e}")
+                    continue
+            offset += BATCH_SIZE
+
+    op.drop_column("dag_run", "conf")
+
+    op.alter_column("dag_run", "conf_json", existing_type=conf_type, 
new_column_name="conf")
+
+
+def downgrade():
+    """Unapply Remove pickled data from dagrun table."""
+    conn = op.get_bind()
+    op.add_column("dag_run", sa.Column("conf_pickle", sa.PickleType(), 
nullable=True))
+
+    if context.is_offline_mode():
+        print(
+            dedent("""
+            ------------
+            --  WARNING: Unable to migrate the data in the 'conf' column while 
in offline mode!
+            --  The 'conf' column will be set to NULL in offline mode.
+            --  Avoid using offline mode if you need to retain 'conf' values.
+            ------------
+            """)
+        )
+
+    else:
+        BATCH_SIZE = 100
+        offset = 0
+        while True:
+            rows = conn.execute(
+                text(
+                    f"SELECT id,conf FROM dag_run WHERE conf IS not NULL order 
by id LIMIT {BATCH_SIZE} OFFSET {offset}"
+                )
+            ).fetchall()
+            if not rows:
+                break
+            for row in rows:
+                row_id, json_data = row
+
+                try:
+                    pickled_data = pickle.dumps(json_data, 
protocol=pickle.HIGHEST_PROTOCOL)
+                    conn.execute(
+                        text("""
+                            UPDATE dag_run
+                            SET conf_pickle = :pickle_data
+                            WHERE id = :id
+                        """),
+                        {"pickle_data": pickled_data, "id": row_id},
+                    )
+                except Exception as e:
+                    print(f"Error pickling dagrun conf for dagrun ID {row_id}: 
{e}")
+                    continue
+            offset += BATCH_SIZE
+
+    op.drop_column("dag_run", "conf")
+
+    op.alter_column("dag_run", "conf_pickle", existing_type=sa.PickleType(), 
new_column_name="conf")
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index a5bef7e589c..15d275da1a4 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -25,6 +25,7 @@ from typing import TYPE_CHECKING, Any, Callable, NamedTuple, 
TypeVar, overload
 
 import re2
 from sqlalchemy import (
+    JSON,
     Boolean,
     Column,
     Enum,
@@ -32,7 +33,6 @@ from sqlalchemy import (
     ForeignKeyConstraint,
     Index,
     Integer,
-    PickleType,
     PrimaryKeyConstraint,
     String,
     Text,
@@ -45,6 +45,7 @@ from sqlalchemy import (
     tuple_,
     update,
 )
+from sqlalchemy.dialects import postgresql
 from sqlalchemy.exc import IntegrityError
 from sqlalchemy.ext.associationproxy import association_proxy
 from sqlalchemy.orm import declared_attr, joinedload, relationship, synonym, 
validates
@@ -138,7 +139,7 @@ class DagRun(Base, LoggingMixin):
     triggered_by = Column(
         Enum(DagRunTriggeredByType, native_enum=False, length=50)
     )  # Airflow component that triggered the run.
-    conf = Column(PickleType)
+    conf = Column(JSON().with_variant(postgresql.JSONB, "postgresql"))
     # These two must be either both NULL or both datetime.
     data_interval_start = Column(UtcDateTime)
     data_interval_end = Column(UtcDateTime)
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index 29d3dc5439c..1a1eb6f4d35 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -94,7 +94,7 @@ _REVISION_HEADS_MAP: dict[str, str] = {
     "2.9.2": "686269002441",
     "2.10.0": "22ed7efa9da2",
     "2.10.3": "5f2621c13b39",
-    "3.0.0": "38770795785f",
+    "3.0.0": "e39a26ac59f6",
 }
 
 
diff --git a/docs/apache-airflow/img/airflow_erd.sha256 
b/docs/apache-airflow/img/airflow_erd.sha256
index 48c765c1699..3616222fd88 100644
--- a/docs/apache-airflow/img/airflow_erd.sha256
+++ b/docs/apache-airflow/img/airflow_erd.sha256
@@ -1 +1 @@
-f4ad824c8d9ff45e86002506edd83b540a88dab45bb292b1af96cd86dec5ecab
\ No newline at end of file
+ca59d711e6304f8bfdb25f49339d455602430dd6b880e420869fc892faef0596
\ No newline at end of file
diff --git a/docs/apache-airflow/img/airflow_erd.svg 
b/docs/apache-airflow/img/airflow_erd.svg
index b0f6d6b8966..24f75b32470 100644
--- a/docs/apache-airflow/img/airflow_erd.svg
+++ b/docs/apache-airflow/img/airflow_erd.svg
@@ -1793,7 +1793,7 @@
 <polygon fill="none" stroke="black" points="1347,-3176 1347,-3201 1630,-3201 
1630,-3176 1347,-3176"/>
 <text text-anchor="start" x="1352" y="-3185.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">conf</text>
 <text text-anchor="start" x="1383" y="-3185.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1388" y="-3185.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [BYTEA]</text>
+<text text-anchor="start" x="1388" y="-3185.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [JSONB]</text>
 <polygon fill="none" stroke="black" points="1347,-3151 1347,-3176 1630,-3176 
1630,-3151 1347,-3151"/>
 <text text-anchor="start" x="1352" y="-3160.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">creating_job_id</text>
 <text text-anchor="start" x="1456" y="-3160.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
diff --git a/docs/apache-airflow/migrations-ref.rst 
b/docs/apache-airflow/migrations-ref.rst
index 166dd0183a6..62013ff8f79 100644
--- a/docs/apache-airflow/migrations-ref.rst
+++ b/docs/apache-airflow/migrations-ref.rst
@@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are 
executed via when you ru
 
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
 | Revision ID             | Revises ID       | Airflow Version   | Description 
                                                 |
 
+=========================+==================+===================+==============================================================+
-| ``38770795785f`` (head) | ``5c9c0231baa2`` | ``3.0.0``         | Add asset 
reference models.                                  |
+| ``e39a26ac59f6`` (head) | ``38770795785f`` | ``3.0.0``         | remove 
pickled data from dagrun table.                       |
++-------------------------+------------------+-------------------+--------------------------------------------------------------+
+| ``38770795785f``        | ``5c9c0231baa2`` | ``3.0.0``         | Add asset 
reference models.                                  |
 
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
 | ``5c9c0231baa2``        | ``237cef8dfea1`` | ``3.0.0``         | Remove 
processor_subdir.                                     |
 
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
diff --git a/newsfragments/44533.significant.rst 
b/newsfragments/44533.significant.rst
new file mode 100644
index 00000000000..55619c244f5
--- /dev/null
+++ b/newsfragments/44533.significant.rst
@@ -0,0 +1,5 @@
+During offline migration, ``DagRun.conf`` is cleared
+
+.. Provide additional contextual information
+
+The ``conf`` column is changing from pickle to json, thus, the values in that 
column cannot be migrated during offline migrations. If you want to retain 
``conf`` values for existing DagRuns, you must do a normal, non-offline, 
migration.

Reply via email to