This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 671dd4cb293 Fix external DB manager upgrades with existing tables
(#66674)
671dd4cb293 is described below
commit 671dd4cb2935b0736450305149c82da183ac07de
Author: Anmol Mishra <[email protected]>
AuthorDate: Wed May 13 23:36:22 2026 +0530
Fix external DB manager upgrades with existing tables (#66674)
* Fix external DB manager upgrades with existing tables
* Fix provider db manager upgrade compatibility
* Add provider DB manager compatibility comments
* Clarify provider DB manager cleanup version
* Fix Kafka trigger sync executor contention
* Stabilize Kafka message queue trigger test
* fix: narrow db manager fix to core
---------
Co-authored-by: Anmol Mishra <[email protected]>
---
airflow-core/src/airflow/utils/db_manager.py | 42 ++++++++++++++++++--
airflow-core/tests/unit/utils/test_db_manager.py | 50 +++++++++++++++++++++++-
2 files changed, 88 insertions(+), 4 deletions(-)
diff --git a/airflow-core/src/airflow/utils/db_manager.py
b/airflow-core/src/airflow/utils/db_manager.py
index 8c4eb11a0d4..57173341e88 100644
--- a/airflow-core/src/airflow/utils/db_manager.py
+++ b/airflow-core/src/airflow/utils/db_manager.py
@@ -142,6 +142,37 @@ class BaseDBManager(LoggingMixin):
command.stamp(config, "head")
self.log.info("%s tables have been created from the ORM",
self.__class__.__name__)
+ def _has_existing_manager_tables(self) -> bool:
+ """Return whether any table managed by this DB manager already
exists."""
+ inspector = inspect(self.session.get_bind())
+ table_names_by_schema: dict[str | None, set[str]] = {}
+ for table in self.metadata.tables.values():
+ table_names_by_schema.setdefault(table.schema,
set()).add(table.name)
+
+ for schema, table_names in table_names_by_schema.items():
+ existing_table_names =
set(inspector.get_table_names(schema=schema))
+ if table_names.intersection(existing_table_names):
+ return True
+ return False
+
+ def _get_base_revision(self, config=None) -> str:
+ """Return the first/base Alembic revision for this DB manager."""
+ script = self.get_script_object(config)
+ for revision in script.walk_revisions():
+ if revision.down_revision is None:
+ return revision.revision
+ raise RuntimeError(f"No base revision found for
{self.__class__.__name__}")
+
+ def _stamp_base_revision(self, config) -> None:
+ """Stamp the database to this DB manager's base Alembic revision."""
+ base_revision = self._get_base_revision(config)
+ self.log.info(
+ "%s tables already exist without an Alembic version; stamping base
revision %s before upgrade",
+ self.__class__.__name__,
+ base_revision,
+ )
+ command.stamp(config, base_revision)
+
def drop_tables(self, connection):
if not self.supports_table_dropping:
return
@@ -189,10 +220,15 @@ class BaseDBManager(LoggingMixin):
self._release_metadata_locks_if_needed()
if not current_revision and not to_revision and not
use_migration_files and not show_sql_only:
- self.create_db_from_orm()
- return
+ if self._has_existing_manager_tables():
+ config = self.get_alembic_config()
+ self._stamp_base_revision(config)
+ else:
+ self.create_db_from_orm()
+ return
+ else:
+ config = self.get_alembic_config()
- config = self.get_alembic_config()
command.upgrade(config, revision=to_revision or "heads",
sql=show_sql_only)
self.log.info("Migrated the %s database", self.__class__.__name__)
diff --git a/airflow-core/tests/unit/utils/test_db_manager.py
b/airflow-core/tests/unit/utils/test_db_manager.py
index fa57fd9554a..1f8f3e7423a 100644
--- a/airflow-core/tests/unit/utils/test_db_manager.py
+++ b/airflow-core/tests/unit/utils/test_db_manager.py
@@ -22,6 +22,7 @@ from contextlib import nullcontext
from unittest import mock
import pytest
+from sqlalchemy import Column, Integer, MetaData, Table
from airflow.models import Base
from airflow.utils.db_manager import BaseDBManager, RunDBManager
@@ -50,6 +51,17 @@ class CustomDBManager(BaseDBManager):
alembic_command.downgrade(config, revision=to_revision,
sql=show_sql_only)
+legacy_metadata = MetaData()
+Table("external_legacy_table", legacy_metadata, Column("id", Integer,
primary_key=True))
+
+
+class LegacyTablesDBManager(BaseDBManager):
+ metadata = legacy_metadata
+ version_table_name = "legacy_alembic_version"
+ migration_dir = "legacy_migration_dir"
+ alembic_file = "legacy_alembic.ini"
+
+
class LegacySignatureExternalManager:
initdb_calls = 0
upgradedb_calls = 0
@@ -154,15 +166,51 @@ class TestBaseDBManager:
assert "Upgrading the MockDBManager database" in caplog.text
@mock.patch.object(BaseDBManager, "create_db_from_orm")
+ @mock.patch.object(BaseDBManager, "_has_existing_manager_tables",
return_value=False)
@mock.patch.object(BaseDBManager, "get_current_revision")
def test_upgrade_empty_db_without_migration_files_uses_create_db_from_orm(
- self, mock_current_revision, mock_create_db_from_orm, session
+ self, mock_current_revision, mock_has_existing_manager_tables,
mock_create_db_from_orm, session
):
mock_current_revision.return_value = None
manager = MockDBManager(session)
manager.upgradedb()
+ mock_has_existing_manager_tables.assert_called_once()
mock_create_db_from_orm.assert_called_once()
+ @mock.patch.object(BaseDBManager, "get_current_revision",
return_value=None)
+ @mock.patch.object(BaseDBManager, "create_db_from_orm")
+ @mock.patch.object(BaseDBManager, "get_alembic_config")
+ @mock.patch.object(BaseDBManager, "get_script_object")
+ @mock.patch("airflow.utils.db_manager.inspect")
+ @mock.patch("alembic.command.stamp")
+ @mock.patch("alembic.command.upgrade")
+ def
test_upgrade_with_existing_manager_tables_without_version_stamps_base_then_runs_migrations(
+ self,
+ mock_upgrade,
+ mock_stamp,
+ mock_inspect,
+ mock_get_script_object,
+ mock_get_alembic_config,
+ mock_create_db_from_orm,
+ mock_get_current_revision,
+ session,
+ ):
+ config = object()
+ mock_get_alembic_config.return_value = config
+ base_revision = mock.Mock(revision="base-revision", down_revision=None)
+ mock_get_script_object.return_value.walk_revisions.return_value = [
+ mock.Mock(revision="head-revision", down_revision="base-revision"),
+ base_revision,
+ ]
+ mock_inspect.return_value.get_table_names.return_value =
["external_legacy_table"]
+
+ manager = LegacyTablesDBManager(session)
+ manager.upgradedb()
+
+ mock_create_db_from_orm.assert_not_called()
+ mock_stamp.assert_called_once_with(config, "base-revision")
+ mock_upgrade.assert_called_once_with(config, revision="heads",
sql=False)
+
@mock.patch.object(BaseDBManager, "get_script_object")
@mock.patch.object(BaseDBManager, "get_current_revision")
def test_check_migration(self, mock_script_obj, mock_current_revision,
session):