This is an automated email from the ASF dual-hosted git repository.

ashb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1621508b108 Revert "Remove global variables in airflow.settings 
(#67070)" (#67099)
1621508b108 is described below

commit 1621508b108095ac944144f5c9565d124dc6be85
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Mon May 18 10:54:31 2026 +0100

    Revert "Remove global variables in airflow.settings (#67070)" (#67099)
    
    This reverts commit cf709d929a0faf22420f6a207da31eb5d729483f.
    
    As discussed #67070 (review) it needs a proper discussion, not a single 
approval and yolo merge.
---
 airflow-core/src/airflow/migrations/env.py         |  2 +-
 airflow-core/src/airflow/settings.py               | 94 ++++++----------------
 airflow-core/src/airflow/utils/db.py               | 11 ++-
 airflow-core/src/airflow/utils/db_manager.py       |  2 +-
 airflow-core/tests/unit/core/test_settings.py      | 20 ++---
 .../tests/unit/core/test_sqlalchemy_config.py      | 16 ++--
 airflow-core/tests/unit/utils/test_db.py           | 32 +++-----
 .../providers/fab/auth_manager/models/db.py        | 17 ++--
 .../src/airflow/providers/fab/migrations/env.py    |  4 +-
 .../src/airflow/providers/fab/version_compat.py    |  1 -
 .../tests/unit/fab/auth_manager/models/test_db.py  | 11 +--
 .../src/airflow/sdk/execution_time/supervisor.py   |  4 +-
 12 files changed, 79 insertions(+), 135 deletions(-)

diff --git a/airflow-core/src/airflow/migrations/env.py 
b/airflow-core/src/airflow/migrations/env.py
index 2f4f8307cc8..dfb46ab6c37 100644
--- a/airflow-core/src/airflow/migrations/env.py
+++ b/airflow-core/src/airflow/migrations/env.py
@@ -80,7 +80,7 @@ def run_migrations_offline():
 
     """
     context.configure(
-        url=settings.get_sql_alchemy_conn(),
+        url=settings.SQL_ALCHEMY_CONN,
         target_metadata=target_metadata,
         literal_binds=True,
         compare_type=compare_type,
diff --git a/airflow-core/src/airflow/settings.py 
b/airflow-core/src/airflow/settings.py
index a824227b23b..739e2437f6a 100644
--- a/airflow-core/src/airflow/settings.py
+++ b/airflow-core/src/airflow/settings.py
@@ -110,15 +110,8 @@ HEADER = "\n".join(
 
 SIMPLE_LOG_FORMAT = conf.get("logging", "simple_log_format")
 
-
-class _AirflowSettings:
-    """Class to hold Airflow settings. This is used to avoid circular imports 
and to allow for lazy loading of settings."""
-
-    block_orm_access: bool = False
-    sql_alchemy_conn: str | None = None
-    sql_alchemy_conn_async: str | None = None
-
-
+SQL_ALCHEMY_CONN: str | None = None
+SQL_ALCHEMY_CONN_ASYNC: str | None = None
 PLUGINS_FOLDER: str | None = None
 DAGS_FOLDER: str = os.path.expanduser(conf.get_mandatory_value("core", 
"DAGS_FOLDER"))
 
@@ -133,25 +126,7 @@ async_engine: AsyncEngine | None = None
 AsyncSession: Callable[..., SAAsyncSession] | None = None
 
 
-def get_sql_alchemy_conn() -> str:
-    """Get the configured SQLAlchemy connection string, raising an error if 
not configured."""
-    if _AirflowSettings.block_orm_access:
-        raise AttributeError("Access to the Airflow Metadatabase from dags is 
not allowed!")
-    if _AirflowSettings.sql_alchemy_conn is None:
-        raise RuntimeError("SQLAlchemy connection string not configured. Call 
configure_vars() first.")
-    return _AirflowSettings.sql_alchemy_conn
-
-
-def get_sql_alchemy_conn_async() -> str:
-    """Get the configured SQLAlchemy connection string, raising an error if 
not configured."""
-    if _AirflowSettings.block_orm_access:
-        raise AttributeError("Access to the Airflow Metadatabase from dags is 
not allowed!")
-    if _AirflowSettings.sql_alchemy_conn_async is None:
-        raise RuntimeError("SQLAlchemy async connection string not configured. 
Call configure_vars() first.")
-    return _AirflowSettings.sql_alchemy_conn_async
-
-
-def get_engine() -> Engine:
+def get_engine():
     """Get the configured engine, raising an error if not configured."""
     if engine is None:
         raise RuntimeError("Engine not configured. Call configure_orm() 
first.")
@@ -165,11 +140,6 @@ def get_session():
     return Session
 
 
-def block_orm_access():
-    """Block access to the ORM by marking state."""
-    _AirflowSettings.block_orm_access = True
-
-
 # The JSON library to use for DAG Serialization and De-Serialization
 json = json_lib
 
@@ -280,16 +250,16 @@ def _get_async_conn_uri_from_sync(sync_uri):
 
 def configure_vars():
     """Configure Global Variables from airflow.cfg."""
+    global SQL_ALCHEMY_CONN
+    global SQL_ALCHEMY_CONN_ASYNC
     global DAGS_FOLDER
     global PLUGINS_FOLDER
 
-    _AirflowSettings.sql_alchemy_conn = conf.get("database", 
"sql_alchemy_conn")
+    SQL_ALCHEMY_CONN = conf.get("database", "sql_alchemy_conn")
     if conf.has_option("database", "sql_alchemy_conn_async"):
-        _AirflowSettings.sql_alchemy_conn_async = conf.get("database", 
"sql_alchemy_conn_async")
+        SQL_ALCHEMY_CONN_ASYNC = conf.get("database", "sql_alchemy_conn_async")
     else:
-        _AirflowSettings.sql_alchemy_conn_async = 
_get_async_conn_uri_from_sync(
-            sync_uri=_AirflowSettings.sql_alchemy_conn
-        )
+        SQL_ALCHEMY_CONN_ASYNC = 
_get_async_conn_uri_from_sync(sync_uri=SQL_ALCHEMY_CONN)
 
     DAGS_FOLDER = os.path.expanduser(conf.get("core", "DAGS_FOLDER"))
 
@@ -432,8 +402,7 @@ def _configure_async_session() -> None:
     """
     global AsyncSession, async_engine
 
-    async_conn = _AirflowSettings.sql_alchemy_conn_async
-    if not async_conn:
+    if not SQL_ALCHEMY_CONN_ASYNC:
         async_engine = None
         AsyncSession = None
         return
@@ -445,14 +414,14 @@ def _configure_async_session() -> None:
     engine_args: dict[str, Any] = {}
     if not conf.getboolean("database", "SQL_ALCHEMY_POOL_ENABLED"):
         engine_args["poolclass"] = NullPool
-    elif not async_conn.startswith("sqlite"):
+    elif not SQL_ALCHEMY_CONN_ASYNC.startswith("sqlite"):
         engine_args["pool_size"] = conf.getint("database", 
"SQL_ALCHEMY_POOL_SIZE", fallback=5)
         engine_args["pool_recycle"] = conf.getint("database", 
"SQL_ALCHEMY_POOL_RECYCLE", fallback=1800)
         engine_args["pool_pre_ping"] = conf.getboolean("database", 
"SQL_ALCHEMY_POOL_PRE_PING", fallback=True)
         engine_args["max_overflow"] = conf.getint("database", 
"SQL_ALCHEMY_MAX_OVERFLOW", fallback=10)
 
     async_engine = create_async_metadata_engine(
-        async_conn,
+        SQL_ALCHEMY_CONN_ASYNC,
         connect_args=_get_connect_args("async"),
         engine_args=engine_args,
     )
@@ -468,12 +437,11 @@ def configure_orm(disable_connection_pool=False, 
pool_class=None):
     """Configure ORM using SQLAlchemy."""
     from airflow._shared.secrets_masker import mask_secret
 
-    conn_str = get_sql_alchemy_conn()
-    if _is_sqlite_db_path_relative(conn_str):
+    if _is_sqlite_db_path_relative(SQL_ALCHEMY_CONN):
         from airflow.exceptions import AirflowConfigException
 
         raise AirflowConfigException(
-            f"Cannot use relative path: `{conn_str}` to connect to sqlite. "
+            f"Cannot use relative path: `{SQL_ALCHEMY_CONN}` to connect to 
sqlite. "
             "Please use absolute path such as `sqlite:////tmp/airflow.db`."
         )
 
@@ -490,13 +458,17 @@ def configure_orm(disable_connection_pool=False, 
pool_class=None):
     engine_args = prepare_engine_args(disable_connection_pool, pool_class)
 
     connect_args = _get_connect_args("sync")
-    if conn_str.startswith("sqlite"):
+    if SQL_ALCHEMY_CONN.startswith("sqlite"):
         # FastAPI runs sync endpoints in a separate thread. SQLite does not 
allow
         # to use objects created in another threads by default. Allowing that 
in test
         # to so the `test` thread and the tested endpoints can use common 
objects.
         connect_args["check_same_thread"] = False
 
-    engine = create_metadata_engine(conn_str, engine_args=engine_args, 
connect_args=connect_args)
+    engine = create_metadata_engine(
+        SQL_ALCHEMY_CONN,
+        engine_args=engine_args,
+        connect_args=connect_args,
+    )
     _configure_async_session()
     mask_secret(engine.url.password)
     setup_event_handlers(engine)
@@ -558,9 +530,8 @@ def prepare_engine_args(disable_connection_pool=False, 
pool_class=None):
     }
 
     default_args = {}
-    conn_str = get_sql_alchemy_conn()
     for dialect, default in DEFAULT_ENGINE_ARGS.items():
-        if conn_str.startswith(dialect):
+        if SQL_ALCHEMY_CONN.startswith(dialect):
             default_args = default.copy()
             break
 
@@ -572,7 +543,7 @@ def prepare_engine_args(disable_connection_pool=False, 
pool_class=None):
     elif disable_connection_pool or not conf.getboolean("database", 
"SQL_ALCHEMY_POOL_ENABLED"):
         engine_args["poolclass"] = NullPool
         log.debug("settings.prepare_engine_args(): Using NullPool")
-    elif _is_sqlite_in_memory(conn_str):
+    elif _is_sqlite_in_memory(SQL_ALCHEMY_CONN):
         # In-memory SQLite uses SingletonThreadPool which doesn't support 
pool_size/max_overflow.
         log.debug("settings.prepare_engine_args(): Skipping pool settings for 
in-memory SQLite")
     else:
@@ -625,7 +596,7 @@ def prepare_engine_args(disable_connection_pool=False, 
pool_class=None):
     # 'READ COMMITTED' is the default value for PostgreSQL.
     # More information here:
     # 
https://dev.mysql.com/doc/refman/8.0/en/innodb-transaction-isolation-levels.html
-    if conn_str.startswith("mysql"):
+    if SQL_ALCHEMY_CONN.startswith("mysql"):
         engine_args["isolation_level"] = "READ COMMITTED"
 
     return engine_args
@@ -674,13 +645,12 @@ def configure_adapters():
     """Register Adapters and DB Converters."""
     from pendulum import DateTime as Pendulum
 
-    conn_str = get_sql_alchemy_conn()
-    if conn_str.startswith("sqlite"):
+    if SQL_ALCHEMY_CONN.startswith("sqlite"):
         from sqlite3 import register_adapter
 
         register_adapter(Pendulum, lambda val: val.isoformat(" "))
 
-    if conn_str.startswith("mysql"):
+    if SQL_ALCHEMY_CONN.startswith("mysql"):
         try:
             try:
                 import MySQLdb.converters
@@ -757,22 +727,6 @@ def __getattr__(name: str):
 
     from airflow.exceptions import RemovedInAirflow4Warning
 
-    if name == "SQL_ALCHEMY_CONN":
-        warnings.warn(
-            "settings.SQL_ALCHEMY_CONN has been replaced by 
get_sql_alchemy_conn(). This shim is just for compatibility. "
-            "Please upgrade your provider or integration.",
-            RemovedInAirflow4Warning,
-            stacklevel=2,
-        )
-        return get_sql_alchemy_conn()
-    if name == "SQL_ALCHEMY_CONN_ASYNC":
-        warnings.warn(
-            "settings.SQL_ALCHEMY_CONN_ASYNC has been replaced by 
get_sql_alchemy_conn_async(). "
-            "This shim is just for compatibility. Please upgrade your provider 
or integration.",
-            RemovedInAirflow4Warning,
-            stacklevel=2,
-        )
-        return get_sql_alchemy_conn_async()
     if name == "MASK_SECRETS_IN_LOGS":
         warnings.warn(
             "settings.MASK_SECRETS_IN_LOGS has been removed. This shim returns 
default value of False. "
diff --git a/airflow-core/src/airflow/utils/db.py 
b/airflow-core/src/airflow/utils/db.py
index e611be90d0b..4caa9901bfb 100644
--- a/airflow-core/src/airflow/utils/db.py
+++ b/airflow-core/src/airflow/utils/db.py
@@ -654,8 +654,7 @@ class AutocommitEngineForMySQL:
     """
 
     def __init__(self):
-        conn_str = settings.get_sql_alchemy_conn()
-        self.is_mysql = conn_str and conn_str.lower().startswith("mysql")
+        self.is_mysql = settings.SQL_ALCHEMY_CONN and 
settings.SQL_ALCHEMY_CONN.lower().startswith("mysql")
         self.original_prepare_engine_args = None
 
     def __enter__(self):
@@ -880,7 +879,7 @@ def _get_alembic_config():
     else:
         config = Config(os.path.join(package_dir, alembic_file))
     config.set_main_option("script_location", directory.replace("%", "%%"))
-    config.set_main_option("sqlalchemy.url", 
settings.get_sql_alchemy_conn().replace("%", "%%"))
+    config.set_main_option("sqlalchemy.url", 
settings.SQL_ALCHEMY_CONN.replace("%", "%%"))
     return config
 
 
@@ -1245,6 +1244,9 @@ def upgradedb(
     if from_revision and not show_sql_only:
         raise AirflowException("`from_revision` only supported with 
`sql_only=True`.")
 
+    if not settings.SQL_ALCHEMY_CONN:
+        raise RuntimeError("The settings.SQL_ALCHEMY_CONN not set. This is a 
critical assertion.")
+
     from alembic import command
 
     import_all_models()
@@ -1381,6 +1383,9 @@ def downgrade(*, to_revision, from_revision=None, 
show_sql_only=False, session:
             "downgrade from current revision."
         )
 
+    if not settings.SQL_ALCHEMY_CONN:
+        raise RuntimeError("The settings.SQL_ALCHEMY_CONN not set.")
+
     # alembic adds significant import time, so we import it lazily
     from alembic import command
 
diff --git a/airflow-core/src/airflow/utils/db_manager.py 
b/airflow-core/src/airflow/utils/db_manager.py
index 8aa76bd131b..57173341e88 100644
--- a/airflow-core/src/airflow/utils/db_manager.py
+++ b/airflow-core/src/airflow/utils/db_manager.py
@@ -100,7 +100,7 @@ class BaseDBManager(LoggingMixin):
 
         config = Config(self.alembic_file)
         config.set_main_option("script_location", 
self.migration_dir.replace("%", "%%"))
-        config.set_main_option("sqlalchemy.url", 
settings.get_sql_alchemy_conn().replace("%", "%%"))
+        config.set_main_option("sqlalchemy.url", 
settings.SQL_ALCHEMY_CONN.replace("%", "%%"))
         return config
 
     def get_script_object(self, config=None) -> ScriptDirectory:
diff --git a/airflow-core/tests/unit/core/test_settings.py 
b/airflow-core/tests/unit/core/test_settings.py
index 0126e9a4f40..588567181c5 100644
--- a/airflow-core/tests/unit/core/test_settings.py
+++ b/airflow-core/tests/unit/core/test_settings.py
@@ -243,7 +243,7 @@ class TestMetadataEngineHooks:
 
         with (
             patch("os.environ", {"_AIRFLOW_SKIP_DB_TESTS": "false"}),
-            patch("airflow.settings._AirflowSettings.sql_alchemy_conn", 
"sqlite://"),
+            patch("airflow.settings.SQL_ALCHEMY_CONN", "sqlite://"),
             patch("airflow.settings.Session"),
             patch("airflow.settings.engine"),
             patch("airflow.settings.setup_event_handlers"),
@@ -269,7 +269,7 @@ class TestMetadataEngineHooks:
         mock_create_async_engine.return_value = MagicMock()
 
         with (
-            patch("airflow.settings._AirflowSettings.sql_alchemy_conn_async", 
"sqlite+aiosqlite://"),
+            patch("airflow.settings.SQL_ALCHEMY_CONN_ASYNC", 
"sqlite+aiosqlite://"),
             patch("airflow.settings.conf") as mock_conf,
         ):
             # Pool enabled but sqlite -- pool args should be skipped
@@ -291,10 +291,7 @@ class TestMetadataEngineHooks:
         mock_create_async_engine.return_value = MagicMock()
 
         with (
-            patch(
-                "airflow.settings._AirflowSettings.sql_alchemy_conn_async",
-                "postgresql+asyncpg://localhost/airflow",
-            ),
+            patch("airflow.settings.SQL_ALCHEMY_CONN_ASYNC", 
"postgresql+asyncpg://localhost/airflow"),
             patch("airflow.settings.conf") as mock_conf,
         ):
             mock_conf.getint.side_effect = lambda section, key, fallback=None: 
{
@@ -320,10 +317,7 @@ class TestMetadataEngineHooks:
         mock_create_async_engine.return_value = MagicMock()
 
         with (
-            patch(
-                "airflow.settings._AirflowSettings.sql_alchemy_conn_async",
-                "postgresql+asyncpg://localhost/airflow",
-            ),
+            patch("airflow.settings.SQL_ALCHEMY_CONN_ASYNC", 
"postgresql+asyncpg://localhost/airflow"),
             patch("airflow.settings.conf") as mock_conf,
         ):
             mock_conf.getboolean.return_value = False
@@ -336,10 +330,10 @@ class TestMetadataEngineHooks:
 
     @patch("airflow.settings.create_async_metadata_engine")
     def test_configure_async_session_skips_when_no_async_conn(self, 
mock_create_async_engine):
-        """_configure_async_session() must not call the hook when 
``sql_alchemy_conn_async`` is empty."""
+        """_configure_async_session() must not call the hook when 
SQL_ALCHEMY_CONN_ASYNC is empty."""
         from airflow import settings
 
-        with patch("airflow.settings._AirflowSettings.sql_alchemy_conn_async", 
""):
+        with patch("airflow.settings.SQL_ALCHEMY_CONN_ASYNC", ""):
             settings._configure_async_session()
 
         assert mock_create_async_engine.mock_calls == []
@@ -432,7 +426,7 @@ def test_sqlite_relative_path(value, expectation):
 
     with (
         patch("os.environ", {"_AIRFLOW_SKIP_DB_TESTS": "true"}),
-        patch("airflow.settings._AirflowSettings.sql_alchemy_conn", value),
+        patch("airflow.settings.SQL_ALCHEMY_CONN", value),
         patch("airflow.settings.Session"),
         patch("airflow.settings.engine"),
     ):
diff --git a/airflow-core/tests/unit/core/test_sqlalchemy_config.py 
b/airflow-core/tests/unit/core/test_sqlalchemy_config.py
index 15dbb5a1148..db211bab2c5 100644
--- a/airflow-core/tests/unit/core/test_sqlalchemy_config.py
+++ b/airflow-core/tests/unit/core/test_sqlalchemy_config.py
@@ -38,8 +38,8 @@ class TestSqlAlchemySettings:
         try:
             with pytest.MonkeyPatch.context() as mp:
                 mp.setattr(
-                    settings._AirflowSettings,
-                    "sql_alchemy_conn",
+                    settings,
+                    "SQL_ALCHEMY_CONN",
                     
"mysql+foobar://user:pass@host/dbname?inline=param&another=param",
                 )
                 yield
@@ -56,7 +56,7 @@ class TestSqlAlchemySettings:
         settings.configure_orm()
         expected_kwargs = dict(
             connect_args={}
-            if not settings.get_sql_alchemy_conn().startswith("sqlite")
+            if not settings.SQL_ALCHEMY_CONN.startswith("sqlite")
             else {"check_same_thread": False},
             max_overflow=10,
             pool_pre_ping=True,
@@ -66,7 +66,7 @@ class TestSqlAlchemySettings:
             future=True,
         )
         mock_create_engine.assert_called_once_with(
-            settings.get_sql_alchemy_conn(),
+            settings.SQL_ALCHEMY_CONN,
             **expected_kwargs,
         )
 
@@ -83,7 +83,7 @@ class TestSqlAlchemySettings:
         monkeypatch,
     ):
         """SQLAlchemy 2.0+ uses QueuePool for file-based SQLite, so pool 
settings should be applied."""
-        monkeypatch.setattr(settings._AirflowSettings, "sql_alchemy_conn", 
"sqlite:////tmp/airflow.db")
+        monkeypatch.setattr(settings, "SQL_ALCHEMY_CONN", 
"sqlite:////tmp/airflow.db")
         settings.configure_orm()
         expected_kwargs = dict(
             connect_args={"check_same_thread": False},
@@ -114,7 +114,7 @@ class TestSqlAlchemySettings:
     )
     def test_prepare_engine_args_sqlite_in_memory_skips_pool_settings(self, 
conn_str, monkeypatch):
         """In-memory SQLite uses SingletonThreadPool which doesn't support 
pool_size/max_overflow."""
-        monkeypatch.setattr(settings._AirflowSettings, "sql_alchemy_conn", 
conn_str)
+        monkeypatch.setattr(settings, "SQL_ALCHEMY_CONN", conn_str)
         engine_args = settings.prepare_engine_args()
         assert "pool_size" not in engine_args
         assert "max_overflow" not in engine_args
@@ -139,7 +139,7 @@ class TestSqlAlchemySettings:
         with conf_vars(config):
             settings.configure_orm()
             engine_args = {"arg": 1}
-            if settings.get_sql_alchemy_conn().startswith("mysql"):
+            if settings.SQL_ALCHEMY_CONN.startswith("mysql"):
                 engine_args["isolation_level"] = "READ COMMITTED"
             expected_kwargs = dict(
                 connect_args=SQL_ALCHEMY_CONNECT_ARGS,
@@ -148,7 +148,7 @@ class TestSqlAlchemySettings:
                 **engine_args,
             )
             mock_create_engine.assert_called_once_with(
-                settings.get_sql_alchemy_conn(),
+                settings.SQL_ALCHEMY_CONN,
                 **expected_kwargs,
             )
 
diff --git a/airflow-core/tests/unit/utils/test_db.py 
b/airflow-core/tests/unit/utils/test_db.py
index 600f08a548e..d2f13e27e9e 100644
--- a/airflow-core/tests/unit/utils/test_db.py
+++ b/airflow-core/tests/unit/utils/test_db.py
@@ -406,9 +406,7 @@ class TestAutocommitEngineForMySQL:
     def test_non_mysql_database_is_noop(self, mocker):
         """Test that non-MySQL databases don't trigger any changes."""
         # Mock settings to use PostgreSQL
-        mocker.patch.object(
-            settings._AirflowSettings, "sql_alchemy_conn", 
"postgresql://user:pass@localhost/db"
-        )
+        mocker.patch.object(settings, "SQL_ALCHEMY_CONN", 
"postgresql://user:pass@localhost/db")
         mock_dispose = mocker.patch.object(settings, "dispose_orm")
         mock_configure = mocker.patch.object(settings, "configure_orm")
 
@@ -430,9 +428,7 @@ class TestAutocommitEngineForMySQL:
     def test_mysql_database_modifies_engine(self, mocker):
         """Test that MySQL databases trigger AUTOCOMMIT mode."""
         # Mock settings to use MySQL
-        mocker.patch.object(
-            settings._AirflowSettings, "sql_alchemy_conn", 
"mysql+mysqlconnector://user:pass@localhost/db"
-        )
+        mocker.patch.object(settings, "SQL_ALCHEMY_CONN", 
"mysql+mysqlconnector://user:pass@localhost/db")
         mock_dispose = mocker.patch.object(settings, "dispose_orm")
         mock_configure = mocker.patch.object(settings, "configure_orm")
 
@@ -474,7 +470,7 @@ class TestAutocommitEngineForMySQL:
         ]
 
         for conn_string in mysql_variants:
-            mocker.patch.object(settings._AirflowSettings, "sql_alchemy_conn", 
conn_string)
+            mocker.patch.object(settings, "SQL_ALCHEMY_CONN", conn_string)
             mock_dispose.reset_mock()
             mock_configure.reset_mock()
 
@@ -489,21 +485,18 @@ class TestAutocommitEngineForMySQL:
 
     def test_none_sql_alchemy_conn(self, mocker):
         """Test behavior when SQL_ALCHEMY_CONN is None."""
-        mocker.patch.object(settings._AirflowSettings, "sql_alchemy_conn", 
None)
+        mocker.patch.object(settings, "SQL_ALCHEMY_CONN", None)
         mock_dispose = mocker.patch.object(settings, "dispose_orm")
         mock_configure = mocker.patch.object(settings, "configure_orm")
 
-        with pytest.raises(RuntimeError, match="connection string not 
configured"):
-            with AutocommitEngineForMySQL():
-                pass
-
-        # Failure should not trigger any ORM disposal or reconfiguration side 
effects.
-        mock_dispose.assert_not_called()
-        mock_configure.assert_not_called()
+        with AutocommitEngineForMySQL():
+            # Should be a no-op when connection string is None
+            mock_dispose.assert_not_called()
+            mock_configure.assert_not_called()
 
     def test_prepare_engine_args_with_parameters(self, mocker):
         """Test that prepare_engine_args parameters are properly forwarded."""
-        mocker.patch.object(settings._AirflowSettings, "sql_alchemy_conn", 
"mysql://user:pass@localhost/db")
+        mocker.patch.object(settings, "SQL_ALCHEMY_CONN", 
"mysql://user:pass@localhost/db")
         mocker.patch.object(settings, "dispose_orm")
         mocker.patch.object(settings, "configure_orm")
 
@@ -534,7 +527,7 @@ class TestAutocommitEngineForMySQL:
 
     def test_exception_during_context(self, mocker):
         """Test that cleanup happens even if an exception occurs."""
-        mocker.patch.object(settings._AirflowSettings, "sql_alchemy_conn", 
"mysql://user:pass@localhost/db")
+        mocker.patch.object(settings, "SQL_ALCHEMY_CONN", 
"mysql://user:pass@localhost/db")
         mock_dispose = mocker.patch.object(settings, "dispose_orm")
         mock_configure = mocker.patch.object(settings, "configure_orm")
         original_prepare = mocker.Mock()
@@ -557,7 +550,7 @@ class TestAutocommitEngineForMySQL:
 
     def test_logging_messages(self, mocker):
         """Test that appropriate log messages are generated."""
-        mocker.patch.object(settings._AirflowSettings, "sql_alchemy_conn", 
"mysql://user:pass@localhost/db")
+        mocker.patch.object(settings, "SQL_ALCHEMY_CONN", 
"mysql://user:pass@localhost/db")
         mocker.patch.object(settings, "dispose_orm")
         mocker.patch.object(settings, "configure_orm")
 
@@ -578,8 +571,7 @@ class TestAutocommitEngineForMySQL:
         """Test integration using a fully mocked settings module."""
         # Setup mock settings module
         mock_settings = mocker.Mock()
-        mock_settings._AirflowSettings = mocker.Mock()
-        mock_settings._AirflowSettings.sql_alchemy_conn = 
"mysql://user:pass@localhost/db"
+        mock_settings.SQL_ALCHEMY_CONN = "mysql://user:pass@localhost/db"
         mock_settings.dispose_orm = mocker.Mock()
         mock_settings.configure_orm = mocker.Mock()
         original_prepare = mocker.Mock(return_value={"test": "value"})
diff --git a/providers/fab/src/airflow/providers/fab/auth_manager/models/db.py 
b/providers/fab/src/airflow/providers/fab/auth_manager/models/db.py
index a62cbcc409a..10b83612339 100644
--- a/providers/fab/src/airflow/providers/fab/auth_manager/models/db.py
+++ b/providers/fab/src/airflow/providers/fab/auth_manager/models/db.py
@@ -24,7 +24,6 @@ from sqlalchemy.engine.url import make_url
 from airflow import settings
 from airflow.providers.common.compat.sdk import AirflowException
 from airflow.providers.fab.auth_manager.models import model_metadata
-from airflow.providers.fab.version_compat import AIRFLOW_V_3_3_PLUS
 from airflow.utils.db import _offline_migration, print_happy_cat
 from airflow.utils.db_manager import BaseDBManager
 
@@ -68,8 +67,7 @@ class FABDBManager(BaseDBManager):
 
     def create_db_from_orm(self):
         super().create_db_from_orm()
-        sql_conn = settings.get_sql_alchemy_conn() if AIRFLOW_V_3_3_PLUS else 
settings.SQL_ALCHEMY_CONN
-        db, flask_app = _get_flask_db(sql_conn)
+        db, flask_app = _get_flask_db(settings.SQL_ALCHEMY_CONN)
         with flask_app.app_context():
             db.create_all()
 
@@ -129,6 +127,8 @@ class FABDBManager(BaseDBManager):
         _release_metadata_locks_if_supported(self)
 
         # alembic adds significant import time, so we import it lazily
+        if not settings.SQL_ALCHEMY_CONN:
+            raise RuntimeError("The settings.SQL_ALCHEMY_CONN not set. This is 
a critical assertion.")
         from alembic import command
 
         current_revision = self.get_current_revision()
@@ -147,10 +147,7 @@ class FABDBManager(BaseDBManager):
             config = self.get_alembic_config()
 
         if show_sql_only:
-            sql_conn: str = (
-                settings.get_sql_alchemy_conn() if AIRFLOW_V_3_3_PLUS else 
str(settings.SQL_ALCHEMY_CONN)
-            )
-            if make_url(sql_conn).get_backend_name() == "sqlite":
+            if make_url(settings.SQL_ALCHEMY_CONN).get_backend_name() == 
"sqlite":
                 raise SystemExit("Offline migration not supported for SQLite.")
             if not from_revision:
                 from_revision = current_revision
@@ -175,6 +172,9 @@ class FABDBManager(BaseDBManager):
                 "downgrade from current revision."
             )
 
+        if not settings.SQL_ALCHEMY_CONN:
+            raise RuntimeError("The settings.SQL_ALCHEMY_CONN not set.")
+
         # alembic adds significant import time, so we import it lazily
         from alembic import command
 
@@ -200,7 +200,6 @@ class FABDBManager(BaseDBManager):
 
     def drop_tables(self, connection):
         super().drop_tables(connection)
-        sql_conn = settings.get_sql_alchemy_conn() if AIRFLOW_V_3_3_PLUS else 
settings.SQL_ALCHEMY_CONN
-        db, flask_app = _get_flask_db(sql_conn)
+        db, flask_app = _get_flask_db(settings.SQL_ALCHEMY_CONN)
         with flask_app.app_context():
             db.drop_all()
diff --git a/providers/fab/src/airflow/providers/fab/migrations/env.py 
b/providers/fab/src/airflow/providers/fab/migrations/env.py
index 7a812c56e7a..903057ba602 100644
--- a/providers/fab/src/airflow/providers/fab/migrations/env.py
+++ b/providers/fab/src/airflow/providers/fab/migrations/env.py
@@ -24,7 +24,6 @@ from alembic import context
 
 from airflow import settings
 from airflow.providers.fab.auth_manager.models.db import FABDBManager
-from airflow.providers.fab.version_compat import AIRFLOW_V_3_3_PLUS
 
 # this is the Alembic Config object, which provides
 # access to the values within the .ini file in use.
@@ -68,9 +67,8 @@ def run_migrations_offline():
     script output.
 
     """
-    url: str = settings.get_sql_alchemy_conn() if AIRFLOW_V_3_3_PLUS else 
str(settings.SQL_ALCHEMY_CONN)
     context.configure(
-        url=url,
+        url=settings.SQL_ALCHEMY_CONN,
         target_metadata=target_metadata,
         literal_binds=True,
         compare_type=True,
diff --git a/providers/fab/src/airflow/providers/fab/version_compat.py 
b/providers/fab/src/airflow/providers/fab/version_compat.py
index 419ebf48568..92feca19762 100644
--- a/providers/fab/src/airflow/providers/fab/version_compat.py
+++ b/providers/fab/src/airflow/providers/fab/version_compat.py
@@ -36,4 +36,3 @@ AIRFLOW_V_3_1_PLUS = get_base_airflow_version_tuple() >= (3, 
1, 0)
 AIRFLOW_V_3_1_1_PLUS = get_base_airflow_version_tuple() >= (3, 1, 1)
 AIRFLOW_V_3_1_8_PLUS = get_base_airflow_version_tuple() >= (3, 1, 8)
 AIRFLOW_V_3_2_PLUS = get_base_airflow_version_tuple() >= (3, 2, 0)
-AIRFLOW_V_3_3_PLUS = get_base_airflow_version_tuple() >= (3, 3, 0)
diff --git a/providers/fab/tests/unit/fab/auth_manager/models/test_db.py 
b/providers/fab/tests/unit/fab/auth_manager/models/test_db.py
index 69154acd5b4..452e5522fb7 100644
--- a/providers/fab/tests/unit/fab/auth_manager/models/test_db.py
+++ b/providers/fab/tests/unit/fab/auth_manager/models/test_db.py
@@ -32,8 +32,6 @@ from airflow.utils.db import (
     compare_type,
 )
 
-from tests_common.test_utils.config import conf_vars
-
 pytestmark = [pytest.mark.db_test]
 try:
     from airflow.providers.fab.auth_manager.models.db import FABDBManager
@@ -111,11 +109,14 @@ try:
             actual = mock_om.call_args.kwargs["revision"]
             assert actual == "abc"
 
-        @conf_vars({("database", "sql_alchemy_conn"): 
"sqlite:////tmp/fab-test.db"})
         @mock.patch.object(FABDBManager, "get_current_revision")
         def test_sqlite_offline_upgrade_raises_with_revision(self, mock_gcr, 
session):
-            with pytest.raises(SystemExit, match="Offline migration not 
supported for SQLite"):
-                FABDBManager(session).upgradedb(from_revision=None, 
to_revision=None, show_sql_only=True)
+            with mock.patch(
+                
"airflow.providers.fab.auth_manager.models.db.settings.SQL_ALCHEMY_CONN",
+                "sqlite:////tmp/fab-test.db",
+            ):
+                with pytest.raises(SystemExit, match="Offline migration not 
supported for SQLite"):
+                    FABDBManager(session).upgradedb(from_revision=None, 
to_revision=None, show_sql_only=True)
 
         @mock.patch("alembic.command.upgrade")
         @mock.patch.object(FABDBManager, "create_db_from_orm")
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py 
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index 2269f22420f..3e6236c5786 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -344,7 +344,6 @@ def block_orm_access():
             conf.set("database", "sql_alchemy_conn", conn)
             conf.set("database", "sql_alchemy_conn_cmd", "/bin/false")
             conf.set("database", "sql_alchemy_conn_secret", 
"db-access-blocked")
-        settings.block_orm_access()
 
         # This only gets called when the module does not already have an 
attribute, and for these values
         # lets give a custom error message
@@ -355,6 +354,9 @@ def block_orm_access():
 
         settings.__getattr__ = __getattr__
 
+        settings.SQL_ALCHEMY_CONN = conn
+        settings.SQL_ALCHEMY_CONN_ASYNC = conn
+
     os.environ["AIRFLOW__DATABASE__SQL_ALCHEMY_CONN"] = conn
     os.environ["AIRFLOW__CORE__SQL_ALCHEMY_CONN"] = conn
 

Reply via email to