This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ddbe0429f09 Fix provider DB upgrades with existing tables (#66883)
ddbe0429f09 is described below

commit ddbe0429f0978f3e648c169d31bcbba93f3614d0
Author: Anmol Mishra <[email protected]>
AuthorDate: Thu May 14 01:13:16 2026 +0530

    Fix provider DB upgrades with existing tables (#66883)
    
    Co-authored-by: Anmol Mishra <[email protected]>
---
 .../edge3/src/airflow/providers/edge3/models/db.py | 63 ++++++++++++++++++++++
 providers/edge3/tests/unit/edge3/models/test_db.py | 45 ++++++++++++++++
 .../providers/fab/auth_manager/models/db.py        | 50 +++++++++++++++--
 .../tests/unit/fab/auth_manager/models/test_db.py  | 41 +++++++++++++-
 4 files changed, 194 insertions(+), 5 deletions(-)

diff --git a/providers/edge3/src/airflow/providers/edge3/models/db.py 
b/providers/edge3/src/airflow/providers/edge3/models/db.py
index a1cd1d42ee1..c93fb454ece 100644
--- a/providers/edge3/src/airflow/providers/edge3/models/db.py
+++ b/providers/edge3/src/airflow/providers/edge3/models/db.py
@@ -61,6 +61,69 @@ class EdgeDBManager(BaseDBManager):
     supports_table_dropping = True
     revision_heads_map = _REVISION_HEADS_MAP
 
+    # Compatibility override for Airflow < 3.3; remove when provider minimum 
is 3.3.
+    def _has_existing_manager_tables(self) -> bool:
+        """Return whether any table managed by this DB manager already 
exists."""
+        inspector = inspect(self.session.get_bind())
+        table_names_by_schema: dict[str | None, set[str]] = {}
+        for table in self.metadata.tables.values():
+            table_names_by_schema.setdefault(table.schema, 
set()).add(table.name)
+
+        for schema, table_names in table_names_by_schema.items():
+            existing_table_names = 
set(inspector.get_table_names(schema=schema))
+            if table_names.intersection(existing_table_names):
+                return True
+        return False
+
+    # Compatibility override for Airflow < 3.3; remove when provider minimum 
is 3.3.
+    def _get_base_revision(self, config=None) -> str:
+        """Return the first/base Alembic revision for this DB manager."""
+        script = self.get_script_object(config)
+        for revision in script.walk_revisions():
+            if revision.down_revision is None:
+                return revision.revision
+        raise RuntimeError(f"No base revision found for 
{self.__class__.__name__}")
+
+    # Compatibility override for Airflow < 3.3; remove when provider minimum 
is 3.3.
+    def _stamp_base_revision(self, config) -> None:
+        """Stamp the database to this DB manager's base Alembic revision."""
+        from alembic import command
+
+        base_revision = self._get_base_revision(config)
+        self.log.info(
+            "%s tables already exist without an Alembic version; stamping base 
revision %s before upgrade",
+            self.__class__.__name__,
+            base_revision,
+        )
+        command.stamp(config, base_revision)
+
+    # Compatibility override for Airflow < 3.3; remove when provider minimum 
is 3.3.
+    def upgradedb(self, to_revision=None, from_revision=None, 
show_sql_only=False, use_migration_files=False):
+        """Upgrade the database, handling pre-alembic installations on older 
Airflow versions."""
+        self.log.info("Upgrading the %s database", self.__class__.__name__)
+
+        release_metadata_locks = getattr(self, 
"_release_metadata_locks_if_needed", None)
+        if callable(release_metadata_locks):
+            release_metadata_locks()
+        current_revision = self.get_current_revision()
+        if callable(release_metadata_locks):
+            release_metadata_locks()
+
+        if not current_revision and not to_revision and not 
use_migration_files and not show_sql_only:
+            if self._has_existing_manager_tables():
+                config = self.get_alembic_config()
+                self._stamp_base_revision(config)
+            else:
+                self.create_db_from_orm()
+                return
+        else:
+            config = self.get_alembic_config()
+
+        from alembic import command
+
+        command.upgrade(config, revision=to_revision or "heads", 
sql=show_sql_only)
+        self.log.info("Migrated the %s database", self.__class__.__name__)
+
     def initdb(self, use_migration_files: bool = False):
         """
         Initialize the database, handling pre-alembic installations.
diff --git a/providers/edge3/tests/unit/edge3/models/test_db.py 
b/providers/edge3/tests/unit/edge3/models/test_db.py
index 4c2b95a9fff..812526c9e56 100644
--- a/providers/edge3/tests/unit/edge3/models/test_db.py
+++ b/providers/edge3/tests/unit/edge3/models/test_db.py
@@ -265,6 +265,51 @@ class TestEdgeDBManager:
         assert "concurrency" in columns
         assert "team_name" in columns
 
+    def 
test_upgradedb_stamps_and_upgrades_when_tables_exist_without_version(self, 
session):
+        """Test upgradedb runs incremental migrations when tables exist but 
alembic version table does not."""
+        from sqlalchemy import inspect, text
+
+        from airflow import settings
+        from airflow.providers.edge3.models.db import EdgeDBManager
+
+        manager = EdgeDBManager(session)
+
+        # Simulate pre-alembic state: tables exist but no version table and no 
concurrency column
+        with settings.engine.begin() as conn:
+            inspector = inspect(conn)
+            if inspector.has_table("alembic_version_edge3"):
+                conn.execute(text("DELETE FROM alembic_version_edge3"))
+            if "concurrency" in {col["name"] for col in 
inspector.get_columns("edge_worker")}:
+                from alembic.migration import MigrationContext
+                from alembic.operations import Operations
+
+                mc = MigrationContext.configure(conn, opts={"render_as_batch": 
True})
+                ops = Operations(mc)
+                ops.drop_column("edge_worker", "concurrency")
+                ops.add_column(
+                    "edge_worker",
+                    sa.Column("jobs_failed", sa.INTEGER(), 
autoincrement=False, default=0, nullable=False),
+                )
+                ops.add_column(
+                    "edge_worker",
+                    sa.Column("jobs_taken", sa.INTEGER(), autoincrement=False, 
default=0, nullable=False),
+                )
+                ops.add_column(
+                    "edge_worker",
+                    sa.Column("jobs_success", sa.INTEGER(), 
autoincrement=False, default=0, nullable=False),
+                )
+
+        # upgradedb() should detect tables exist, stamp to base, then upgrade
+        manager.upgradedb()
+
+        with settings.engine.connect() as conn:
+            version = conn.execute(text("SELECT version_num FROM 
alembic_version_edge3")).scalar()
+            columns = {col["name"] for col in 
inspect(conn).get_columns("edge_worker")}
+
+        assert version in manager.get_script_object().get_heads()
+        assert "concurrency" in columns
+        assert "team_name" in columns
+
     def test_migration_adds_concurrency_column(self, session):
         """Test that upgrading from 3.0.0 actually adds the concurrency 
column."""
         from alembic import command
diff --git a/providers/fab/src/airflow/providers/fab/auth_manager/models/db.py 
b/providers/fab/src/airflow/providers/fab/auth_manager/models/db.py
index 06115e19bb7..10b83612339 100644
--- a/providers/fab/src/airflow/providers/fab/auth_manager/models/db.py
+++ b/providers/fab/src/airflow/providers/fab/auth_manager/models/db.py
@@ -18,6 +18,7 @@ from __future__ import annotations
 
 from pathlib import Path
 
+from sqlalchemy import inspect
 from sqlalchemy.engine.url import make_url
 
 from airflow import settings
@@ -70,11 +71,48 @@ class FABDBManager(BaseDBManager):
         with flask_app.app_context():
             db.create_all()
 
+    # Compatibility override for Airflow < 3.3; remove when provider minimum 
is 3.3.
+    def _has_existing_manager_tables(self) -> bool:
+        """Return whether any table managed by this DB manager already 
exists."""
+        inspector = inspect(self.session.get_bind())
+        table_names_by_schema: dict[str | None, set[str]] = {}
+        for table in self.metadata.tables.values():
+            table_names_by_schema.setdefault(table.schema, 
set()).add(table.name)
+
+        for schema, table_names in table_names_by_schema.items():
+            existing_table_names = 
set(inspector.get_table_names(schema=schema))
+            if table_names.intersection(existing_table_names):
+                return True
+        return False
+
+    # Compatibility override for Airflow < 3.3; remove when provider minimum 
is 3.3.
+    def _get_base_revision(self, config=None) -> str:
+        """Return the first/base Alembic revision for this DB manager."""
+        script = self.get_script_object(config)
+        for revision in script.walk_revisions():
+            if revision.down_revision is None:
+                return revision.revision
+        raise RuntimeError(f"No base revision found for 
{self.__class__.__name__}")
+
+    # Compatibility override for Airflow < 3.3; remove when provider minimum 
is 3.3.
+    def _stamp_base_revision(self, config) -> None:
+        """Stamp the database to this DB manager's base Alembic revision."""
+        from alembic import command
+
+        base_revision = self._get_base_revision(config)
+        self.log.info(
+            "%s tables already exist without an Alembic version; stamping base 
revision %s before upgrade",
+            self.__class__.__name__,
+            base_revision,
+        )
+        command.stamp(config, base_revision)
+
     def reset_to_2_x(self):
         self.create_db_from_orm()
         # And ensure it's at the oldest version
         self.downgrade(_REVISION_HEADS_MAP["1.4.0"])
 
+    # Compatibility override for Airflow < 3.3; remove when provider minimum 
is 3.3.
     def upgradedb(
         self,
         to_revision=None,
@@ -99,10 +137,14 @@ class FABDBManager(BaseDBManager):
         _release_metadata_locks_if_supported(self)
 
         if not current_revision and not to_revision and not 
use_migration_files and not show_sql_only:
-            self.create_db_from_orm()
-            return
-
-        config = self.get_alembic_config()
+            if self._has_existing_manager_tables():
+                config = self.get_alembic_config()
+                self._stamp_base_revision(config)
+            else:
+                self.create_db_from_orm()
+                return
+        else:
+            config = self.get_alembic_config()
 
         if show_sql_only:
             if make_url(settings.SQL_ALCHEMY_CONN).get_backend_name() == 
"sqlite":
diff --git a/providers/fab/tests/unit/fab/auth_manager/models/test_db.py 
b/providers/fab/tests/unit/fab/auth_manager/models/test_db.py
index 36fab389d2d..452e5522fb7 100644
--- a/providers/fab/tests/unit/fab/auth_manager/models/test_db.py
+++ b/providers/fab/tests/unit/fab/auth_manager/models/test_db.py
@@ -120,16 +120,55 @@ try:
 
         @mock.patch("alembic.command.upgrade")
         @mock.patch.object(FABDBManager, "create_db_from_orm")
+        @mock.patch.object(FABDBManager, "_has_existing_manager_tables", 
return_value=False)
         @mock.patch.object(FABDBManager, "get_current_revision", 
return_value=None)
         def 
test_upgradedb_empty_db_without_migration_files_uses_create_db_from_orm(
-            self, mock_get_current_revision, mock_create_db_from_orm, 
mock_upgrade, session
+            self,
+            mock_get_current_revision,
+            mock_has_existing_manager_tables,
+            mock_create_db_from_orm,
+            mock_upgrade,
+            session,
         ):
             FABDBManager(session).upgradedb()
 
+            mock_has_existing_manager_tables.assert_called_once()
             mock_create_db_from_orm.assert_called_once()
             mock_upgrade.assert_not_called()
             mock_get_current_revision.assert_called_once()
 
+        @mock.patch("alembic.command.upgrade")
+        @mock.patch("alembic.command.stamp")
+        @mock.patch.object(FABDBManager, "get_script_object")
+        @mock.patch.object(FABDBManager, "get_alembic_config", 
return_value=object())
+        @mock.patch.object(FABDBManager, "create_db_from_orm")
+        @mock.patch.object(FABDBManager, "_has_existing_manager_tables", 
return_value=True)
+        @mock.patch.object(FABDBManager, "get_current_revision", 
return_value=None)
+        def 
test_upgradedb_existing_tables_without_version_stamps_base_then_runs_migrations(
+            self,
+            mock_get_current_revision,
+            mock_has_existing_manager_tables,
+            mock_create_db_from_orm,
+            mock_get_alembic_config,
+            mock_get_script_object,
+            mock_stamp,
+            mock_upgrade,
+            session,
+        ):
+            base_revision = mock.Mock(revision="base-revision", 
down_revision=None)
+            mock_get_script_object.return_value.walk_revisions.return_value = [
+                mock.Mock(revision="head-revision", 
down_revision="base-revision"),
+                base_revision,
+            ]
+
+            FABDBManager(session).upgradedb()
+
+            mock_has_existing_manager_tables.assert_called_once()
+            mock_create_db_from_orm.assert_not_called()
+            
mock_stamp.assert_called_once_with(mock_get_alembic_config.return_value, 
"base-revision")
+            
mock_upgrade.assert_called_once_with(mock_get_alembic_config.return_value, 
revision="heads")
+            mock_get_current_revision.assert_called_once()
+
         @mock.patch("alembic.command.upgrade")
         @mock.patch.object(FABDBManager, "create_db_from_orm")
         @mock.patch.object(FABDBManager, "get_current_revision", 
return_value=None)

Reply via email to