This is an automated email from the ASF dual-hosted git repository. ash pushed a commit to branch revert-54231-downgrade-to-2.11 in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 602c34641e7b1597ff3a7ef064163d8c62ff6110 Author: Ash Berlin-Taylor <[email protected]> AuthorDate: Mon Aug 11 21:32:37 2025 +0100 Revert "Allow downgrading to 2.11 from 3.x (#54231)" This reverts commit b9cdc3d9e27596ba45e3b14b36434bb2e821b3cd. --- .../src/airflow/cli/commands/db_command.py | 30 ++++++++-------------- airflow-core/src/airflow/utils/db.py | 28 ++++++++++---------- airflow-core/src/airflow/utils/db_manager.py | 6 +++++ .../tests/unit/cli/commands/test_db_command.py | 16 ------------ airflow-core/tests/unit/utils/test_db.py | 14 ++++++++++ devel-common/src/tests_common/test_utils/db.py | 19 ++------------ .../providers/fab/auth_manager/models/db.py | 5 ---- .../unit/fab/db_manager/test_fab_db_manager.py | 23 ++++++++++++++--- 8 files changed, 66 insertions(+), 75 deletions(-) diff --git a/airflow-core/src/airflow/cli/commands/db_command.py b/airflow-core/src/airflow/cli/commands/db_command.py index ad9bfc393ad..0681639ad21 100644 --- a/airflow-core/src/airflow/cli/commands/db_command.py +++ b/airflow-core/src/airflow/cli/commands/db_command.py @@ -50,39 +50,29 @@ def resetdb(args): db.resetdb(skip_init=args.skip_init) -def _get_version_revision(version: str, revision_heads_map: dict[str, str] | None = None) -> str | None: +def _get_version_revision( + version: str, recursion_limit: int = 10, revision_heads_map: dict[str, str] | None = None +) -> str | None: """ - Search for the revision of the given version in revision_heads_map. + Recursively search for the revision of the given version in revision_heads_map. This searches given revision_heads_map for the revision of the given version, recursively searching for the previous version if the given version is not found. - - ``revision_heads_map`` must already be sorted in the dict in ascending order for this function to work. No - checks are made that this is true """ if revision_heads_map is None: revision_heads_map = _REVISION_HEADS_MAP - # Exact match found, we can just return it if version in revision_heads_map: return revision_heads_map[version] - try: - wanted = tuple(map(int, version.split("."))) + major, minor, patch = map(int, version.split(".")) except ValueError: return None - - # Else, we walk backwards in the revision map until we find a version that is < the target - for revision, head in reversed(revision_heads_map.items()): - try: - current = tuple(map(int, revision.split("."))) - except ValueError: - log.debug("Unable to parse HEAD revision", exc_info=True) - return None - - if current < wanted: - return head - else: + new_version = f"{major}.{minor}.{patch - 1}" + recursion_limit -= 1 + if recursion_limit <= 0: + # Prevent infinite recursion as I can't imagine 10 successive versions without migration return None + return _get_version_revision(new_version, recursion_limit) def run_db_migrate_command(args, command, revision_heads_map: dict[str, str]): diff --git a/airflow-core/src/airflow/utils/db.py b/airflow-core/src/airflow/utils/db.py index 1c686050d66..cd0fea8974d 100644 --- a/airflow-core/src/airflow/utils/db.py +++ b/airflow-core/src/airflow/utils/db.py @@ -1189,21 +1189,21 @@ def downgrade(*, to_revision, from_revision=None, show_sql_only=False, session: config = _get_alembic_config() # Check if downgrade is less than 3.0.0 and requires that `ab_user` fab table is present if _revision_greater(config, _REVISION_HEADS_MAP["2.10.3"], to_revision): - try: - from airflow.providers.fab.auth_manager.models.db import FABDBManager - except ImportError: - # Raise the error with a new message - raise RuntimeError( - "Import error occurred while importing FABDBManager. We need that to exist before we can " - "downgrade to <3.0.0" + unitest_mode = conf.getboolean("core", "unit_test_mode") + if unitest_mode: + try: + from airflow.providers.fab.auth_manager.models.db import FABDBManager + + dbm = FABDBManager(session) + dbm.initdb() + except ImportError: + log.warning("Import error occurred while importing FABDBManager. Skipping the check.") + return + if not inspect(settings.engine).has_table("ab_user") and not unitest_mode: + raise AirflowException( + "Downgrade to revision less than 3.0.0 requires that `ab_user` table is present. " + "Please add FabDBManager to [core] external_db_managers and run fab migrations before proceeding" ) - dbm = FABDBManager(session) - if hasattr(dbm, "reset_to_2_x"): - dbm.reset_to_2_x() - else: - # Older version before we added that function, it only has a single migration so we can just - # created - dbm.create_db_from_orm() with create_global_lock(session=session, lock=DBLocks.MIGRATIONS): if show_sql_only: log.warning("Generating sql scripts for manual migration.") diff --git a/airflow-core/src/airflow/utils/db_manager.py b/airflow-core/src/airflow/utils/db_manager.py index 37d09b4a60a..c746cdaca08 100644 --- a/airflow-core/src/airflow/utils/db_manager.py +++ b/airflow-core/src/airflow/utils/db_manager.py @@ -216,6 +216,12 @@ class RunDBManager(LoggingMixin): m = manager(session) m.upgradedb() + def downgrade(self, session): + """Downgrade the external database managers.""" + for manager in self._managers: + m = manager(session) + m.downgrade() + def drop_tables(self, session, connection): """Drop the external database managers.""" for manager in self._managers: diff --git a/airflow-core/tests/unit/cli/commands/test_db_command.py b/airflow-core/tests/unit/cli/commands/test_db_command.py index eeac6aa1688..e910cc1e314 100644 --- a/airflow-core/tests/unit/cli/commands/test_db_command.py +++ b/airflow-core/tests/unit/cli/commands/test_db_command.py @@ -658,19 +658,3 @@ class TestCLIDBClean: ) db_command.drop_archived(args) mock_drop_archived_records.assert_called_once_with(table_names=None, needs_confirm=expected) - - -def test_get_version_revision(): - heads: dict[str, str] = { - "2.10.0": "22ed7efa9da2", - "2.10.3": "5f2621c13b39", - "3.0.0": "29ce7909c52b", - "3.0.3": "fe199e1abd77", - "3.1.0": "808787349f22", - } - - assert db_command._get_version_revision("3.1.0", heads) == "808787349f22" - assert db_command._get_version_revision("3.1.1", heads) == "808787349f22" - assert db_command._get_version_revision("2.11.1", heads) == "5f2621c13b39" - assert db_command._get_version_revision("2.10.1", heads) == "22ed7efa9da2" - assert db_command._get_version_revision("2.0.0", heads) is None diff --git a/airflow-core/tests/unit/utils/test_db.py b/airflow-core/tests/unit/utils/test_db.py index 4a0d80f907f..66cddbbd5ed 100644 --- a/airflow-core/tests/unit/utils/test_db.py +++ b/airflow-core/tests/unit/utils/test_db.py @@ -33,8 +33,10 @@ from alembic.script import ScriptDirectory from sqlalchemy import Column, Integer, MetaData, Table, select from airflow import settings +from airflow.exceptions import AirflowException from airflow.models import Base as airflow_base from airflow.utils.db import ( + _REVISION_HEADS_MAP, AutocommitEngineForMySQL, LazySelectSequence, _get_alembic_config, @@ -378,6 +380,18 @@ class TestDb: assert bool(lss) is False + @conf_vars({("core", "unit_test_mode"): "False"}) + def test_downgrade_raises_if_lower_than_v3_0_0_and_no_ab_user(self, mocker): + mock_inspect = mocker.patch("airflow.utils.db.inspect") + mock_inspect.return_value.has_table.return_value = False + msg = ( + "Downgrade to revision less than 3.0.0 requires that `ab_user` table is present. " + "Please add FabDBManager to [core] external_db_managers and run fab migrations before " + "proceeding" + ) + with pytest.raises(AirflowException, match=re.escape(msg)): + downgrade(to_revision=_REVISION_HEADS_MAP["2.7.0"]) + class TestAutocommitEngineForMySQL: """Test the AutocommitEngineForMySQL context manager.""" diff --git a/devel-common/src/tests_common/test_utils/db.py b/devel-common/src/tests_common/test_utils/db.py index f26b2ff6782..5248e361d18 100644 --- a/devel-common/src/tests_common/test_utils/db.py +++ b/devel-common/src/tests_common/test_utils/db.py @@ -18,7 +18,6 @@ from __future__ import annotations import json -import os from tempfile import gettempdir from typing import TYPE_CHECKING @@ -102,23 +101,9 @@ def initial_db_init(): from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS db.resetdb() - # If we are testing core we want to downgrand and upgrade to ensure things migraitons are set correctly, - # but we can skip that for providers - if AIRFLOW_V_3_0_PLUS: - if os.getenv("TEST_GROUP") != "providers": - db.downgrade(to_revision="5f2621c13b39") - db.upgradedb(to_revision="head") - else: - try: - from airflow.providers.fab.auth_manager.models.db import FABDBManager - except ModuleNotFoundError: - pass - else: - # If we loaded it (i.e. the provider exists) create the DB - with create_session() as session: - FABDBManager(session).create_db_from_orm() - session.commit() + db.downgrade(to_revision="5f2621c13b39") + db.upgradedb(to_revision="head") else: from flask import Flask diff --git a/providers/fab/src/airflow/providers/fab/auth_manager/models/db.py b/providers/fab/src/airflow/providers/fab/auth_manager/models/db.py index d46df36af5d..1a8d66af370 100644 --- a/providers/fab/src/airflow/providers/fab/auth_manager/models/db.py +++ b/providers/fab/src/airflow/providers/fab/auth_manager/models/db.py @@ -59,11 +59,6 @@ class FABDBManager(BaseDBManager): super().create_db_from_orm() _get_flask_db(settings.SQL_ALCHEMY_CONN).create_all() - def reset_to_2_x(self): - self.create_db_from_orm() - # And ensure it's at the oldest version - self.downgrade(_REVISION_HEADS_MAP["1.4.0"]) - def upgradedb(self, to_revision=None, from_revision=None, show_sql_only=False): """Upgrade the database.""" if from_revision and not show_sql_only: diff --git a/providers/fab/tests/unit/fab/db_manager/test_fab_db_manager.py b/providers/fab/tests/unit/fab/db_manager/test_fab_db_manager.py index 3dc67bf0c9f..04fbcfa69ed 100644 --- a/providers/fab/tests/unit/fab/db_manager/test_fab_db_manager.py +++ b/providers/fab/tests/unit/fab/db_manager/test_fab_db_manager.py @@ -22,7 +22,7 @@ import pytest from sqlalchemy import Table from airflow.exceptions import AirflowException -from airflow.utils.db import initdb +from airflow.utils.db import downgrade, initdb from airflow.utils.db_manager import RunDBManager from tests_common.test_utils.config import conf_vars @@ -57,12 +57,27 @@ class TestRunDBManagerWithFab: run_db_manager.validate() metadata._remove_table("dag_run", None) + @mock.patch.object(RunDBManager, "downgrade") @mock.patch.object(RunDBManager, "upgradedb") @mock.patch.object(RunDBManager, "initdb") - def test_init_db_calls_rundbmanager(self, mock_initdb, mock_upgrade_db, session): + def test_init_db_calls_rundbmanager(self, mock_initdb, mock_upgrade_db, mock_downgrade_db, session): initdb(session=session) mock_initdb.assert_called() mock_initdb.assert_called_once_with(session) + mock_downgrade_db.assert_not_called() + + @mock.patch.object(RunDBManager, "downgrade") + @mock.patch.object(RunDBManager, "upgradedb") + @mock.patch.object(RunDBManager, "initdb") + @mock.patch("alembic.command") + def test_downgrade_dont_call_rundbmanager( + self, mock_alembic_command, mock_initdb, mock_upgrade_db, mock_downgrade_db, session + ): + downgrade(to_revision="base") + mock_alembic_command.downgrade.assert_called_once_with(mock.ANY, revision="base", sql=False) + mock_upgrade_db.assert_not_called() + mock_initdb.assert_not_called() + mock_downgrade_db.assert_not_called() @conf_vars( {("database", "external_db_managers"): "airflow.providers.fab.auth_manager.models.db.FABDBManager"} @@ -78,7 +93,9 @@ class TestRunDBManagerWithFab: # upgradedb ext_db.upgradedb(session=session) fabdb_manager.upgradedb.assert_called_once() - # drop_tables + # downgrade + ext_db.downgrade(session=session) + mock_fabdb_manager.return_value.downgrade.assert_called_once() connection = mock.MagicMock() ext_db.drop_tables(session, connection) mock_fabdb_manager.return_value.drop_tables.assert_called_once_with(connection)
