This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e8183a9e8d Add indexes on dag_id column in referencing tables to speed 
up deletion of dag records (#39638)
e8183a9e8d is described below

commit e8183a9e8d9674364eca6728f3eccd6f85d46982
Author: Pankaj Koti <pankajkoti...@gmail.com>
AuthorDate: Fri May 17 12:24:26 2024 +0530

    Add indexes on dag_id column in referencing tables to speed up deletion of 
dag records (#39638)
    
    * Add indexes on dag_id column in refencing tables to speed up deletion of 
dag records
    
    * Gracefully handle deletion of indexes on foreign key columns during 
downgrade
    
    * Correct constraint key name for dag_owner_attributes table fk
    
    * Handle ForeignKey for dag_owner_attributes table behavior based on db
    
    * Temporarily disable downgrade for dag_owner_attributes table to check the 
behavior in CI
    
    * Skip index for dag_owner_attributes table
    
    * Address @ephraimbuddy's comment
---
 ...dexes_on_dag_id_column_in_referencing_tables.py | 105 +++++++++++++++++++++
 ...=> 0144_2_10_0_add_new_executor_field_to_db.py} |   4 +-
 ..._10_0_added_dagpriorityparsingrequest_table.py} |   0
 airflow/models/dag.py                              |   2 +
 airflow/models/dagwarning.py                       |   3 +-
 airflow/models/dataset.py                          |   3 +
 airflow/utils/db.py                                |   2 +-
 docs/apache-airflow/img/airflow_erd.sha256         |   2 +-
 docs/apache-airflow/migrations-ref.rst             |   4 +-
 9 files changed, 119 insertions(+), 6 deletions(-)

diff --git 
a/airflow/migrations/versions/0143_2_9_2_add_indexes_on_dag_id_column_in_referencing_tables.py
 
b/airflow/migrations/versions/0143_2_9_2_add_indexes_on_dag_id_column_in_referencing_tables.py
new file mode 100644
index 0000000000..6533dc5a7e
--- /dev/null
+++ 
b/airflow/migrations/versions/0143_2_9_2_add_indexes_on_dag_id_column_in_referencing_tables.py
@@ -0,0 +1,105 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add indexes on dag_id column in referencing tables.
+
+Revision ID: 0fd0c178cbe8
+Revises: 686269002441
+Create Date: 2024-05-15 16:52:39.077349
+
+"""
+
+from __future__ import annotations
+
+from alembic import op
+
+# revision identifiers, used by Alembic.
+revision = "0fd0c178cbe8"
+down_revision = "686269002441"
+branch_labels = None
+depends_on = None
+airflow_version = "2.9.2"
+
+
+def upgrade():
+    """Apply Add indexes on dag_id column in referencing tables."""
+    with op.batch_alter_table("dag_schedule_dataset_reference") as batch_op:
+        batch_op.create_index("idx_dag_schedule_dataset_reference_dag_id", 
["dag_id"], unique=False)
+
+    with op.batch_alter_table("dag_tag") as batch_op:
+        batch_op.create_index("idx_dag_tag_dag_id", ["dag_id"], unique=False)
+
+    with op.batch_alter_table("dag_warning") as batch_op:
+        batch_op.create_index("idx_dag_warning_dag_id", ["dag_id"], 
unique=False)
+
+    with op.batch_alter_table("dataset_dag_run_queue") as batch_op:
+        batch_op.create_index("idx_dataset_dag_run_queue_target_dag_id", 
["target_dag_id"], unique=False)
+
+    with op.batch_alter_table("task_outlet_dataset_reference") as batch_op:
+        batch_op.create_index("idx_task_outlet_dataset_reference_dag_id", 
["dag_id"], unique=False)
+
+
+def _handle_foreign_key_constraint_index_deletion(
+    batch_op, constraint_name, index_name, local_fk_column_name
+):
+    conn = op.get_bind()
+    if conn.dialect.name == "mysql":
+        batch_op.drop_constraint(constraint_name, type_="foreignkey")
+        batch_op.drop_index(index_name)
+        batch_op.create_foreign_key(
+            constraint_name, "dag", [local_fk_column_name], ["dag_id"], 
ondelete="CASCADE"
+        )
+    else:
+        batch_op.drop_index(index_name)
+
+
+def downgrade():
+    """Unapply Add indexes on dag_id column in referencing tables."""
+    with op.batch_alter_table("dag_schedule_dataset_reference") as batch_op:
+        _handle_foreign_key_constraint_index_deletion(
+            batch_op,
+            "dsdr_dag_id_fkey",
+            "idx_dag_schedule_dataset_reference_dag_id",
+            "dag_id",
+        )
+
+    with op.batch_alter_table("dag_tag") as batch_op:
+        _handle_foreign_key_constraint_index_deletion(
+            batch_op, "dag_tag_dag_id_fkey", "idx_dag_tag_dag_id", "dag_id"
+        )
+
+    with op.batch_alter_table("dag_warning") as batch_op:
+        _handle_foreign_key_constraint_index_deletion(
+            batch_op, "dcw_dag_id_fkey", "idx_dag_warning_dag_id", "dag_id"
+        )
+
+    with op.batch_alter_table("dataset_dag_run_queue") as batch_op:
+        _handle_foreign_key_constraint_index_deletion(
+            batch_op,
+            "ddrq_dag_fkey",
+            "idx_dataset_dag_run_queue_target_dag_id",
+            "target_dag_id",
+        )
+
+    with op.batch_alter_table("task_outlet_dataset_reference") as batch_op:
+        _handle_foreign_key_constraint_index_deletion(
+            batch_op,
+            "todr_dag_id_fkey",
+            "idx_task_outlet_dataset_reference_dag_id",
+            "dag_id",
+        )
diff --git 
a/airflow/migrations/versions/0143_2_10_0_add_new_executor_field_to_db.py 
b/airflow/migrations/versions/0144_2_10_0_add_new_executor_field_to_db.py
similarity index 96%
rename from 
airflow/migrations/versions/0143_2_10_0_add_new_executor_field_to_db.py
rename to 
airflow/migrations/versions/0144_2_10_0_add_new_executor_field_to_db.py
index 5adf5dbb33..36d02350fc 100644
--- a/airflow/migrations/versions/0143_2_10_0_add_new_executor_field_to_db.py
+++ b/airflow/migrations/versions/0144_2_10_0_add_new_executor_field_to_db.py
@@ -19,7 +19,7 @@
 """add new executor field to db.
 
 Revision ID: 677fdbb7fc54
-Revises: 686269002441
+Revises: 0fd0c178cbe8
 Create Date: 2024-04-01 15:26:59.186579
 
 """
@@ -31,7 +31,7 @@ from alembic import op
 
 # revision identifiers, used by Alembic.
 revision = "677fdbb7fc54"
-down_revision = "686269002441"
+down_revision = "0fd0c178cbe8"
 branch_labels = None
 depends_on = None
 airflow_version = "2.10.0"
diff --git 
a/airflow/migrations/versions/0144_2_10_0_added_dagpriorityparsingrequest_table.py
 
b/airflow/migrations/versions/0145_2_10_0_added_dagpriorityparsingrequest_table.py
similarity index 100%
rename from 
airflow/migrations/versions/0144_2_10_0_added_dagpriorityparsingrequest_table.py
rename to 
airflow/migrations/versions/0145_2_10_0_added_dagpriorityparsingrequest_table.py
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index c9a8424a83..91ee3b083c 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -3555,6 +3555,8 @@ class DagTag(Base):
         primary_key=True,
     )
 
+    __table_args__ = (Index("idx_dag_tag_dag_id", dag_id),)
+
     def __repr__(self):
         return self.name
 
diff --git a/airflow/models/dagwarning.py b/airflow/models/dagwarning.py
index 789fe01727..ffab515f85 100644
--- a/airflow/models/dagwarning.py
+++ b/airflow/models/dagwarning.py
@@ -20,7 +20,7 @@ from __future__ import annotations
 from enum import Enum
 from typing import TYPE_CHECKING
 
-from sqlalchemy import Column, ForeignKeyConstraint, String, Text, delete, 
false, select
+from sqlalchemy import Column, ForeignKeyConstraint, Index, String, Text, 
delete, false, select
 
 from airflow.api_internal.internal_api_call import internal_api_call
 from airflow.models.base import Base, StringID
@@ -55,6 +55,7 @@ class DagWarning(Base):
             name="dcw_dag_id_fkey",
             ondelete="CASCADE",
         ),
+        Index("idx_dag_warning_dag_id", dag_id),
     )
 
     def __init__(self, dag_id: str, error_type: str, message: str, **kwargs):
diff --git a/airflow/models/dataset.py b/airflow/models/dataset.py
index 7b42ff324b..3df059c87d 100644
--- a/airflow/models/dataset.py
+++ b/airflow/models/dataset.py
@@ -138,6 +138,7 @@ class DagScheduleDatasetReference(Base):
             name="dsdr_dag_id_fkey",
             ondelete="CASCADE",
         ),
+        Index("idx_dag_schedule_dataset_reference_dag_id", dag_id),
     )
 
     def __eq__(self, other):
@@ -182,6 +183,7 @@ class TaskOutletDatasetReference(Base):
             name="todr_dag_id_fkey",
             ondelete="CASCADE",
         ),
+        Index("idx_task_outlet_dataset_reference_dag_id", dag_id),
     )
 
     def __eq__(self, other):
@@ -226,6 +228,7 @@ class DatasetDagRunQueue(Base):
             name="ddrq_dag_fkey",
             ondelete="CASCADE",
         ),
+        Index("idx_dataset_dag_run_queue_target_dag_id", target_dag_id),
     )
 
     def __eq__(self, other):
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index 41fa1ab2e0..f2c837eef0 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -117,7 +117,7 @@ _REVISION_HEADS_MAP = {
     "2.8.0": "10b52ebd31f7",
     "2.8.1": "88344c1d9134",
     "2.9.0": "1949afb29106",
-    "2.9.2": "686269002441",
+    "2.9.2": "0fd0c178cbe8",
     "2.10.0": "c4602ba06b4b",
 }
 
diff --git a/docs/apache-airflow/img/airflow_erd.sha256 
b/docs/apache-airflow/img/airflow_erd.sha256
index f71ce47401..8263e6d131 100644
--- a/docs/apache-airflow/img/airflow_erd.sha256
+++ b/docs/apache-airflow/img/airflow_erd.sha256
@@ -1 +1 @@
-62c46e83c8c2a7051fa3d7388e06535bcd01fae4cb1d01b1096cc069a82a89f8
\ No newline at end of file
+a78b6a58b3e7f006d1f460d34cd901261e56397b027c81da500151400ecac41f
\ No newline at end of file
diff --git a/docs/apache-airflow/migrations-ref.rst 
b/docs/apache-airflow/migrations-ref.rst
index 6ae768550b..45fce007f7 100644
--- a/docs/apache-airflow/migrations-ref.rst
+++ b/docs/apache-airflow/migrations-ref.rst
@@ -41,7 +41,9 @@ Here's the list of all the Database Migrations that are 
executed via when you ru
 
+=================================+===================+===================+==============================================================+
 | ``c4602ba06b4b`` (head)         | ``677fdbb7fc54``  | ``2.10.0``        | 
Added DagPriorityParsingRequest table.                       |
 
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
-| ``677fdbb7fc54``                | ``686269002441``  | ``2.10.0``        | 
add new executor field to db.                                |
+| ``677fdbb7fc54``                | ``0fd0c178cbe8``  | ``2.10.0``        | 
add new executor field to db.                                |
++---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
+| ``0fd0c178cbe8``                | ``686269002441``  | ``2.9.2``         | 
Add indexes on dag_id column in referencing tables.          |
 
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
 | ``686269002441``                | ``bff083ad727d``  | ``2.9.2``         | 
Fix inconsistency between ORM and migration files.           |
 
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+

Reply via email to