This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ddbe0429f09 Fix provider DB upgrades with existing tables (#66883)
ddbe0429f09 is described below
commit ddbe0429f0978f3e648c169d31bcbba93f3614d0
Author: Anmol Mishra <[email protected]>
AuthorDate: Thu May 14 01:13:16 2026 +0530
Fix provider DB upgrades with existing tables (#66883)
Co-authored-by: Anmol Mishra <[email protected]>
---
.../edge3/src/airflow/providers/edge3/models/db.py | 63 ++++++++++++++++++++++
providers/edge3/tests/unit/edge3/models/test_db.py | 45 ++++++++++++++++
.../providers/fab/auth_manager/models/db.py | 50 +++++++++++++++--
.../tests/unit/fab/auth_manager/models/test_db.py | 41 +++++++++++++-
4 files changed, 194 insertions(+), 5 deletions(-)
diff --git a/providers/edge3/src/airflow/providers/edge3/models/db.py
b/providers/edge3/src/airflow/providers/edge3/models/db.py
index a1cd1d42ee1..c93fb454ece 100644
--- a/providers/edge3/src/airflow/providers/edge3/models/db.py
+++ b/providers/edge3/src/airflow/providers/edge3/models/db.py
@@ -61,6 +61,69 @@ class EdgeDBManager(BaseDBManager):
supports_table_dropping = True
revision_heads_map = _REVISION_HEADS_MAP
+ # Compatibility override for Airflow < 3.3; remove when provider minimum
is 3.3.
+ def _has_existing_manager_tables(self) -> bool:
+ """Return whether any table managed by this DB manager already
exists."""
+ inspector = inspect(self.session.get_bind())
+ table_names_by_schema: dict[str | None, set[str]] = {}
+ for table in self.metadata.tables.values():
+ table_names_by_schema.setdefault(table.schema,
set()).add(table.name)
+
+ for schema, table_names in table_names_by_schema.items():
+ existing_table_names =
set(inspector.get_table_names(schema=schema))
+ if table_names.intersection(existing_table_names):
+ return True
+ return False
+
+ # Compatibility override for Airflow < 3.3; remove when provider minimum
is 3.3.
+ def _get_base_revision(self, config=None) -> str:
+ """Return the first/base Alembic revision for this DB manager."""
+ script = self.get_script_object(config)
+ for revision in script.walk_revisions():
+ if revision.down_revision is None:
+ return revision.revision
+ raise RuntimeError(f"No base revision found for
{self.__class__.__name__}")
+
+ # Compatibility override for Airflow < 3.3; remove when provider minimum
is 3.3.
+ def _stamp_base_revision(self, config) -> None:
+ """Stamp the database to this DB manager's base Alembic revision."""
+ from alembic import command
+
+ base_revision = self._get_base_revision(config)
+ self.log.info(
+ "%s tables already exist without an Alembic version; stamping base
revision %s before upgrade",
+ self.__class__.__name__,
+ base_revision,
+ )
+ command.stamp(config, base_revision)
+
+ # Compatibility override for Airflow < 3.3; remove when provider minimum
is 3.3.
+ def upgradedb(self, to_revision=None, from_revision=None,
show_sql_only=False, use_migration_files=False):
+ """Upgrade the database, handling pre-alembic installations on older
Airflow versions."""
+ self.log.info("Upgrading the %s database", self.__class__.__name__)
+
+ release_metadata_locks = getattr(self,
"_release_metadata_locks_if_needed", None)
+ if callable(release_metadata_locks):
+ release_metadata_locks()
+ current_revision = self.get_current_revision()
+ if callable(release_metadata_locks):
+ release_metadata_locks()
+
+ if not current_revision and not to_revision and not
use_migration_files and not show_sql_only:
+ if self._has_existing_manager_tables():
+ config = self.get_alembic_config()
+ self._stamp_base_revision(config)
+ else:
+ self.create_db_from_orm()
+ return
+ else:
+ config = self.get_alembic_config()
+
+ from alembic import command
+
+ command.upgrade(config, revision=to_revision or "heads",
sql=show_sql_only)
+ self.log.info("Migrated the %s database", self.__class__.__name__)
+
def initdb(self, use_migration_files: bool = False):
"""
Initialize the database, handling pre-alembic installations.
diff --git a/providers/edge3/tests/unit/edge3/models/test_db.py
b/providers/edge3/tests/unit/edge3/models/test_db.py
index 4c2b95a9fff..812526c9e56 100644
--- a/providers/edge3/tests/unit/edge3/models/test_db.py
+++ b/providers/edge3/tests/unit/edge3/models/test_db.py
@@ -265,6 +265,51 @@ class TestEdgeDBManager:
assert "concurrency" in columns
assert "team_name" in columns
+ def
test_upgradedb_stamps_and_upgrades_when_tables_exist_without_version(self,
session):
+ """Test upgradedb runs incremental migrations when tables exist but
alembic version table does not."""
+ from sqlalchemy import inspect, text
+
+ from airflow import settings
+ from airflow.providers.edge3.models.db import EdgeDBManager
+
+ manager = EdgeDBManager(session)
+
+ # Simulate pre-alembic state: tables exist but no version table and no
concurrency column
+ with settings.engine.begin() as conn:
+ inspector = inspect(conn)
+ if inspector.has_table("alembic_version_edge3"):
+ conn.execute(text("DELETE FROM alembic_version_edge3"))
+ if "concurrency" in {col["name"] for col in
inspector.get_columns("edge_worker")}:
+ from alembic.migration import MigrationContext
+ from alembic.operations import Operations
+
+ mc = MigrationContext.configure(conn, opts={"render_as_batch":
True})
+ ops = Operations(mc)
+ ops.drop_column("edge_worker", "concurrency")
+ ops.add_column(
+ "edge_worker",
+ sa.Column("jobs_failed", sa.INTEGER(),
autoincrement=False, default=0, nullable=False),
+ )
+ ops.add_column(
+ "edge_worker",
+ sa.Column("jobs_taken", sa.INTEGER(), autoincrement=False,
default=0, nullable=False),
+ )
+ ops.add_column(
+ "edge_worker",
+ sa.Column("jobs_success", sa.INTEGER(),
autoincrement=False, default=0, nullable=False),
+ )
+
+ # upgradedb() should detect tables exist, stamp to base, then upgrade
+ manager.upgradedb()
+
+ with settings.engine.connect() as conn:
+ version = conn.execute(text("SELECT version_num FROM
alembic_version_edge3")).scalar()
+ columns = {col["name"] for col in
inspect(conn).get_columns("edge_worker")}
+
+ assert version in manager.get_script_object().get_heads()
+ assert "concurrency" in columns
+ assert "team_name" in columns
+
def test_migration_adds_concurrency_column(self, session):
"""Test that upgrading from 3.0.0 actually adds the concurrency
column."""
from alembic import command
diff --git a/providers/fab/src/airflow/providers/fab/auth_manager/models/db.py
b/providers/fab/src/airflow/providers/fab/auth_manager/models/db.py
index 06115e19bb7..10b83612339 100644
--- a/providers/fab/src/airflow/providers/fab/auth_manager/models/db.py
+++ b/providers/fab/src/airflow/providers/fab/auth_manager/models/db.py
@@ -18,6 +18,7 @@ from __future__ import annotations
from pathlib import Path
+from sqlalchemy import inspect
from sqlalchemy.engine.url import make_url
from airflow import settings
@@ -70,11 +71,48 @@ class FABDBManager(BaseDBManager):
with flask_app.app_context():
db.create_all()
+ # Compatibility override for Airflow < 3.3; remove when provider minimum
is 3.3.
+ def _has_existing_manager_tables(self) -> bool:
+ """Return whether any table managed by this DB manager already
exists."""
+ inspector = inspect(self.session.get_bind())
+ table_names_by_schema: dict[str | None, set[str]] = {}
+ for table in self.metadata.tables.values():
+ table_names_by_schema.setdefault(table.schema,
set()).add(table.name)
+
+ for schema, table_names in table_names_by_schema.items():
+ existing_table_names =
set(inspector.get_table_names(schema=schema))
+ if table_names.intersection(existing_table_names):
+ return True
+ return False
+
+ # Compatibility override for Airflow < 3.3; remove when provider minimum
is 3.3.
+ def _get_base_revision(self, config=None) -> str:
+ """Return the first/base Alembic revision for this DB manager."""
+ script = self.get_script_object(config)
+ for revision in script.walk_revisions():
+ if revision.down_revision is None:
+ return revision.revision
+ raise RuntimeError(f"No base revision found for
{self.__class__.__name__}")
+
+ # Compatibility override for Airflow < 3.3; remove when provider minimum
is 3.3.
+ def _stamp_base_revision(self, config) -> None:
+ """Stamp the database to this DB manager's base Alembic revision."""
+ from alembic import command
+
+ base_revision = self._get_base_revision(config)
+ self.log.info(
+ "%s tables already exist without an Alembic version; stamping base
revision %s before upgrade",
+ self.__class__.__name__,
+ base_revision,
+ )
+ command.stamp(config, base_revision)
+
def reset_to_2_x(self):
self.create_db_from_orm()
# And ensure it's at the oldest version
self.downgrade(_REVISION_HEADS_MAP["1.4.0"])
+ # Compatibility override for Airflow < 3.3; remove when provider minimum
is 3.3.
def upgradedb(
self,
to_revision=None,
@@ -99,10 +137,14 @@ class FABDBManager(BaseDBManager):
_release_metadata_locks_if_supported(self)
if not current_revision and not to_revision and not
use_migration_files and not show_sql_only:
- self.create_db_from_orm()
- return
-
- config = self.get_alembic_config()
+ if self._has_existing_manager_tables():
+ config = self.get_alembic_config()
+ self._stamp_base_revision(config)
+ else:
+ self.create_db_from_orm()
+ return
+ else:
+ config = self.get_alembic_config()
if show_sql_only:
if make_url(settings.SQL_ALCHEMY_CONN).get_backend_name() ==
"sqlite":
diff --git a/providers/fab/tests/unit/fab/auth_manager/models/test_db.py
b/providers/fab/tests/unit/fab/auth_manager/models/test_db.py
index 36fab389d2d..452e5522fb7 100644
--- a/providers/fab/tests/unit/fab/auth_manager/models/test_db.py
+++ b/providers/fab/tests/unit/fab/auth_manager/models/test_db.py
@@ -120,16 +120,55 @@ try:
@mock.patch("alembic.command.upgrade")
@mock.patch.object(FABDBManager, "create_db_from_orm")
+ @mock.patch.object(FABDBManager, "_has_existing_manager_tables",
return_value=False)
@mock.patch.object(FABDBManager, "get_current_revision",
return_value=None)
def
test_upgradedb_empty_db_without_migration_files_uses_create_db_from_orm(
- self, mock_get_current_revision, mock_create_db_from_orm,
mock_upgrade, session
+ self,
+ mock_get_current_revision,
+ mock_has_existing_manager_tables,
+ mock_create_db_from_orm,
+ mock_upgrade,
+ session,
):
FABDBManager(session).upgradedb()
+ mock_has_existing_manager_tables.assert_called_once()
mock_create_db_from_orm.assert_called_once()
mock_upgrade.assert_not_called()
mock_get_current_revision.assert_called_once()
+ @mock.patch("alembic.command.upgrade")
+ @mock.patch("alembic.command.stamp")
+ @mock.patch.object(FABDBManager, "get_script_object")
+ @mock.patch.object(FABDBManager, "get_alembic_config",
return_value=object())
+ @mock.patch.object(FABDBManager, "create_db_from_orm")
+ @mock.patch.object(FABDBManager, "_has_existing_manager_tables",
return_value=True)
+ @mock.patch.object(FABDBManager, "get_current_revision",
return_value=None)
+ def
test_upgradedb_existing_tables_without_version_stamps_base_then_runs_migrations(
+ self,
+ mock_get_current_revision,
+ mock_has_existing_manager_tables,
+ mock_create_db_from_orm,
+ mock_get_alembic_config,
+ mock_get_script_object,
+ mock_stamp,
+ mock_upgrade,
+ session,
+ ):
+ base_revision = mock.Mock(revision="base-revision",
down_revision=None)
+ mock_get_script_object.return_value.walk_revisions.return_value = [
+ mock.Mock(revision="head-revision",
down_revision="base-revision"),
+ base_revision,
+ ]
+
+ FABDBManager(session).upgradedb()
+
+ mock_has_existing_manager_tables.assert_called_once()
+ mock_create_db_from_orm.assert_not_called()
+
mock_stamp.assert_called_once_with(mock_get_alembic_config.return_value,
"base-revision")
+
mock_upgrade.assert_called_once_with(mock_get_alembic_config.return_value,
revision="heads")
+ mock_get_current_revision.assert_called_once()
+
@mock.patch("alembic.command.upgrade")
@mock.patch.object(FABDBManager, "create_db_from_orm")
@mock.patch.object(FABDBManager, "get_current_revision",
return_value=None)