This is an automated email from the ASF dual-hosted git repository.
ashb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1621508b108 Revert "Remove global variables in airflow.settings
(#67070)" (#67099)
1621508b108 is described below
commit 1621508b108095ac944144f5c9565d124dc6be85
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Mon May 18 10:54:31 2026 +0100
Revert "Remove global variables in airflow.settings (#67070)" (#67099)
This reverts commit cf709d929a0faf22420f6a207da31eb5d729483f.
As discussed #67070 (review) it needs a proper discussion, not a single
approval and yolo merge.
---
airflow-core/src/airflow/migrations/env.py | 2 +-
airflow-core/src/airflow/settings.py | 94 ++++++----------------
airflow-core/src/airflow/utils/db.py | 11 ++-
airflow-core/src/airflow/utils/db_manager.py | 2 +-
airflow-core/tests/unit/core/test_settings.py | 20 ++---
.../tests/unit/core/test_sqlalchemy_config.py | 16 ++--
airflow-core/tests/unit/utils/test_db.py | 32 +++-----
.../providers/fab/auth_manager/models/db.py | 17 ++--
.../src/airflow/providers/fab/migrations/env.py | 4 +-
.../src/airflow/providers/fab/version_compat.py | 1 -
.../tests/unit/fab/auth_manager/models/test_db.py | 11 +--
.../src/airflow/sdk/execution_time/supervisor.py | 4 +-
12 files changed, 79 insertions(+), 135 deletions(-)
diff --git a/airflow-core/src/airflow/migrations/env.py
b/airflow-core/src/airflow/migrations/env.py
index 2f4f8307cc8..dfb46ab6c37 100644
--- a/airflow-core/src/airflow/migrations/env.py
+++ b/airflow-core/src/airflow/migrations/env.py
@@ -80,7 +80,7 @@ def run_migrations_offline():
"""
context.configure(
- url=settings.get_sql_alchemy_conn(),
+ url=settings.SQL_ALCHEMY_CONN,
target_metadata=target_metadata,
literal_binds=True,
compare_type=compare_type,
diff --git a/airflow-core/src/airflow/settings.py
b/airflow-core/src/airflow/settings.py
index a824227b23b..739e2437f6a 100644
--- a/airflow-core/src/airflow/settings.py
+++ b/airflow-core/src/airflow/settings.py
@@ -110,15 +110,8 @@ HEADER = "\n".join(
SIMPLE_LOG_FORMAT = conf.get("logging", "simple_log_format")
-
-class _AirflowSettings:
- """Class to hold Airflow settings. This is used to avoid circular imports
and to allow for lazy loading of settings."""
-
- block_orm_access: bool = False
- sql_alchemy_conn: str | None = None
- sql_alchemy_conn_async: str | None = None
-
-
+SQL_ALCHEMY_CONN: str | None = None
+SQL_ALCHEMY_CONN_ASYNC: str | None = None
PLUGINS_FOLDER: str | None = None
DAGS_FOLDER: str = os.path.expanduser(conf.get_mandatory_value("core",
"DAGS_FOLDER"))
@@ -133,25 +126,7 @@ async_engine: AsyncEngine | None = None
AsyncSession: Callable[..., SAAsyncSession] | None = None
-def get_sql_alchemy_conn() -> str:
- """Get the configured SQLAlchemy connection string, raising an error if
not configured."""
- if _AirflowSettings.block_orm_access:
- raise AttributeError("Access to the Airflow Metadatabase from dags is
not allowed!")
- if _AirflowSettings.sql_alchemy_conn is None:
- raise RuntimeError("SQLAlchemy connection string not configured. Call
configure_vars() first.")
- return _AirflowSettings.sql_alchemy_conn
-
-
-def get_sql_alchemy_conn_async() -> str:
- """Get the configured SQLAlchemy connection string, raising an error if
not configured."""
- if _AirflowSettings.block_orm_access:
- raise AttributeError("Access to the Airflow Metadatabase from dags is
not allowed!")
- if _AirflowSettings.sql_alchemy_conn_async is None:
- raise RuntimeError("SQLAlchemy async connection string not configured.
Call configure_vars() first.")
- return _AirflowSettings.sql_alchemy_conn_async
-
-
-def get_engine() -> Engine:
+def get_engine():
"""Get the configured engine, raising an error if not configured."""
if engine is None:
raise RuntimeError("Engine not configured. Call configure_orm()
first.")
@@ -165,11 +140,6 @@ def get_session():
return Session
-def block_orm_access():
- """Block access to the ORM by marking state."""
- _AirflowSettings.block_orm_access = True
-
-
# The JSON library to use for DAG Serialization and De-Serialization
json = json_lib
@@ -280,16 +250,16 @@ def _get_async_conn_uri_from_sync(sync_uri):
def configure_vars():
"""Configure Global Variables from airflow.cfg."""
+ global SQL_ALCHEMY_CONN
+ global SQL_ALCHEMY_CONN_ASYNC
global DAGS_FOLDER
global PLUGINS_FOLDER
- _AirflowSettings.sql_alchemy_conn = conf.get("database",
"sql_alchemy_conn")
+ SQL_ALCHEMY_CONN = conf.get("database", "sql_alchemy_conn")
if conf.has_option("database", "sql_alchemy_conn_async"):
- _AirflowSettings.sql_alchemy_conn_async = conf.get("database",
"sql_alchemy_conn_async")
+ SQL_ALCHEMY_CONN_ASYNC = conf.get("database", "sql_alchemy_conn_async")
else:
- _AirflowSettings.sql_alchemy_conn_async =
_get_async_conn_uri_from_sync(
- sync_uri=_AirflowSettings.sql_alchemy_conn
- )
+ SQL_ALCHEMY_CONN_ASYNC =
_get_async_conn_uri_from_sync(sync_uri=SQL_ALCHEMY_CONN)
DAGS_FOLDER = os.path.expanduser(conf.get("core", "DAGS_FOLDER"))
@@ -432,8 +402,7 @@ def _configure_async_session() -> None:
"""
global AsyncSession, async_engine
- async_conn = _AirflowSettings.sql_alchemy_conn_async
- if not async_conn:
+ if not SQL_ALCHEMY_CONN_ASYNC:
async_engine = None
AsyncSession = None
return
@@ -445,14 +414,14 @@ def _configure_async_session() -> None:
engine_args: dict[str, Any] = {}
if not conf.getboolean("database", "SQL_ALCHEMY_POOL_ENABLED"):
engine_args["poolclass"] = NullPool
- elif not async_conn.startswith("sqlite"):
+ elif not SQL_ALCHEMY_CONN_ASYNC.startswith("sqlite"):
engine_args["pool_size"] = conf.getint("database",
"SQL_ALCHEMY_POOL_SIZE", fallback=5)
engine_args["pool_recycle"] = conf.getint("database",
"SQL_ALCHEMY_POOL_RECYCLE", fallback=1800)
engine_args["pool_pre_ping"] = conf.getboolean("database",
"SQL_ALCHEMY_POOL_PRE_PING", fallback=True)
engine_args["max_overflow"] = conf.getint("database",
"SQL_ALCHEMY_MAX_OVERFLOW", fallback=10)
async_engine = create_async_metadata_engine(
- async_conn,
+ SQL_ALCHEMY_CONN_ASYNC,
connect_args=_get_connect_args("async"),
engine_args=engine_args,
)
@@ -468,12 +437,11 @@ def configure_orm(disable_connection_pool=False,
pool_class=None):
"""Configure ORM using SQLAlchemy."""
from airflow._shared.secrets_masker import mask_secret
- conn_str = get_sql_alchemy_conn()
- if _is_sqlite_db_path_relative(conn_str):
+ if _is_sqlite_db_path_relative(SQL_ALCHEMY_CONN):
from airflow.exceptions import AirflowConfigException
raise AirflowConfigException(
- f"Cannot use relative path: `{conn_str}` to connect to sqlite. "
+ f"Cannot use relative path: `{SQL_ALCHEMY_CONN}` to connect to
sqlite. "
"Please use absolute path such as `sqlite:////tmp/airflow.db`."
)
@@ -490,13 +458,17 @@ def configure_orm(disable_connection_pool=False,
pool_class=None):
engine_args = prepare_engine_args(disable_connection_pool, pool_class)
connect_args = _get_connect_args("sync")
- if conn_str.startswith("sqlite"):
+ if SQL_ALCHEMY_CONN.startswith("sqlite"):
# FastAPI runs sync endpoints in a separate thread. SQLite does not
allow
# to use objects created in another threads by default. Allowing that
in test
# to so the `test` thread and the tested endpoints can use common
objects.
connect_args["check_same_thread"] = False
- engine = create_metadata_engine(conn_str, engine_args=engine_args,
connect_args=connect_args)
+ engine = create_metadata_engine(
+ SQL_ALCHEMY_CONN,
+ engine_args=engine_args,
+ connect_args=connect_args,
+ )
_configure_async_session()
mask_secret(engine.url.password)
setup_event_handlers(engine)
@@ -558,9 +530,8 @@ def prepare_engine_args(disable_connection_pool=False,
pool_class=None):
}
default_args = {}
- conn_str = get_sql_alchemy_conn()
for dialect, default in DEFAULT_ENGINE_ARGS.items():
- if conn_str.startswith(dialect):
+ if SQL_ALCHEMY_CONN.startswith(dialect):
default_args = default.copy()
break
@@ -572,7 +543,7 @@ def prepare_engine_args(disable_connection_pool=False,
pool_class=None):
elif disable_connection_pool or not conf.getboolean("database",
"SQL_ALCHEMY_POOL_ENABLED"):
engine_args["poolclass"] = NullPool
log.debug("settings.prepare_engine_args(): Using NullPool")
- elif _is_sqlite_in_memory(conn_str):
+ elif _is_sqlite_in_memory(SQL_ALCHEMY_CONN):
# In-memory SQLite uses SingletonThreadPool which doesn't support
pool_size/max_overflow.
log.debug("settings.prepare_engine_args(): Skipping pool settings for
in-memory SQLite")
else:
@@ -625,7 +596,7 @@ def prepare_engine_args(disable_connection_pool=False,
pool_class=None):
# 'READ COMMITTED' is the default value for PostgreSQL.
# More information here:
#
https://dev.mysql.com/doc/refman/8.0/en/innodb-transaction-isolation-levels.html
- if conn_str.startswith("mysql"):
+ if SQL_ALCHEMY_CONN.startswith("mysql"):
engine_args["isolation_level"] = "READ COMMITTED"
return engine_args
@@ -674,13 +645,12 @@ def configure_adapters():
"""Register Adapters and DB Converters."""
from pendulum import DateTime as Pendulum
- conn_str = get_sql_alchemy_conn()
- if conn_str.startswith("sqlite"):
+ if SQL_ALCHEMY_CONN.startswith("sqlite"):
from sqlite3 import register_adapter
register_adapter(Pendulum, lambda val: val.isoformat(" "))
- if conn_str.startswith("mysql"):
+ if SQL_ALCHEMY_CONN.startswith("mysql"):
try:
try:
import MySQLdb.converters
@@ -757,22 +727,6 @@ def __getattr__(name: str):
from airflow.exceptions import RemovedInAirflow4Warning
- if name == "SQL_ALCHEMY_CONN":
- warnings.warn(
- "settings.SQL_ALCHEMY_CONN has been replaced by
get_sql_alchemy_conn(). This shim is just for compatibility. "
- "Please upgrade your provider or integration.",
- RemovedInAirflow4Warning,
- stacklevel=2,
- )
- return get_sql_alchemy_conn()
- if name == "SQL_ALCHEMY_CONN_ASYNC":
- warnings.warn(
- "settings.SQL_ALCHEMY_CONN_ASYNC has been replaced by
get_sql_alchemy_conn_async(). "
- "This shim is just for compatibility. Please upgrade your provider
or integration.",
- RemovedInAirflow4Warning,
- stacklevel=2,
- )
- return get_sql_alchemy_conn_async()
if name == "MASK_SECRETS_IN_LOGS":
warnings.warn(
"settings.MASK_SECRETS_IN_LOGS has been removed. This shim returns
default value of False. "
diff --git a/airflow-core/src/airflow/utils/db.py
b/airflow-core/src/airflow/utils/db.py
index e611be90d0b..4caa9901bfb 100644
--- a/airflow-core/src/airflow/utils/db.py
+++ b/airflow-core/src/airflow/utils/db.py
@@ -654,8 +654,7 @@ class AutocommitEngineForMySQL:
"""
def __init__(self):
- conn_str = settings.get_sql_alchemy_conn()
- self.is_mysql = conn_str and conn_str.lower().startswith("mysql")
+ self.is_mysql = settings.SQL_ALCHEMY_CONN and
settings.SQL_ALCHEMY_CONN.lower().startswith("mysql")
self.original_prepare_engine_args = None
def __enter__(self):
@@ -880,7 +879,7 @@ def _get_alembic_config():
else:
config = Config(os.path.join(package_dir, alembic_file))
config.set_main_option("script_location", directory.replace("%", "%%"))
- config.set_main_option("sqlalchemy.url",
settings.get_sql_alchemy_conn().replace("%", "%%"))
+ config.set_main_option("sqlalchemy.url",
settings.SQL_ALCHEMY_CONN.replace("%", "%%"))
return config
@@ -1245,6 +1244,9 @@ def upgradedb(
if from_revision and not show_sql_only:
raise AirflowException("`from_revision` only supported with
`sql_only=True`.")
+ if not settings.SQL_ALCHEMY_CONN:
+ raise RuntimeError("The settings.SQL_ALCHEMY_CONN not set. This is a
critical assertion.")
+
from alembic import command
import_all_models()
@@ -1381,6 +1383,9 @@ def downgrade(*, to_revision, from_revision=None,
show_sql_only=False, session:
"downgrade from current revision."
)
+ if not settings.SQL_ALCHEMY_CONN:
+ raise RuntimeError("The settings.SQL_ALCHEMY_CONN not set.")
+
# alembic adds significant import time, so we import it lazily
from alembic import command
diff --git a/airflow-core/src/airflow/utils/db_manager.py
b/airflow-core/src/airflow/utils/db_manager.py
index 8aa76bd131b..57173341e88 100644
--- a/airflow-core/src/airflow/utils/db_manager.py
+++ b/airflow-core/src/airflow/utils/db_manager.py
@@ -100,7 +100,7 @@ class BaseDBManager(LoggingMixin):
config = Config(self.alembic_file)
config.set_main_option("script_location",
self.migration_dir.replace("%", "%%"))
- config.set_main_option("sqlalchemy.url",
settings.get_sql_alchemy_conn().replace("%", "%%"))
+ config.set_main_option("sqlalchemy.url",
settings.SQL_ALCHEMY_CONN.replace("%", "%%"))
return config
def get_script_object(self, config=None) -> ScriptDirectory:
diff --git a/airflow-core/tests/unit/core/test_settings.py
b/airflow-core/tests/unit/core/test_settings.py
index 0126e9a4f40..588567181c5 100644
--- a/airflow-core/tests/unit/core/test_settings.py
+++ b/airflow-core/tests/unit/core/test_settings.py
@@ -243,7 +243,7 @@ class TestMetadataEngineHooks:
with (
patch("os.environ", {"_AIRFLOW_SKIP_DB_TESTS": "false"}),
- patch("airflow.settings._AirflowSettings.sql_alchemy_conn",
"sqlite://"),
+ patch("airflow.settings.SQL_ALCHEMY_CONN", "sqlite://"),
patch("airflow.settings.Session"),
patch("airflow.settings.engine"),
patch("airflow.settings.setup_event_handlers"),
@@ -269,7 +269,7 @@ class TestMetadataEngineHooks:
mock_create_async_engine.return_value = MagicMock()
with (
- patch("airflow.settings._AirflowSettings.sql_alchemy_conn_async",
"sqlite+aiosqlite://"),
+ patch("airflow.settings.SQL_ALCHEMY_CONN_ASYNC",
"sqlite+aiosqlite://"),
patch("airflow.settings.conf") as mock_conf,
):
# Pool enabled but sqlite -- pool args should be skipped
@@ -291,10 +291,7 @@ class TestMetadataEngineHooks:
mock_create_async_engine.return_value = MagicMock()
with (
- patch(
- "airflow.settings._AirflowSettings.sql_alchemy_conn_async",
- "postgresql+asyncpg://localhost/airflow",
- ),
+ patch("airflow.settings.SQL_ALCHEMY_CONN_ASYNC",
"postgresql+asyncpg://localhost/airflow"),
patch("airflow.settings.conf") as mock_conf,
):
mock_conf.getint.side_effect = lambda section, key, fallback=None:
{
@@ -320,10 +317,7 @@ class TestMetadataEngineHooks:
mock_create_async_engine.return_value = MagicMock()
with (
- patch(
- "airflow.settings._AirflowSettings.sql_alchemy_conn_async",
- "postgresql+asyncpg://localhost/airflow",
- ),
+ patch("airflow.settings.SQL_ALCHEMY_CONN_ASYNC",
"postgresql+asyncpg://localhost/airflow"),
patch("airflow.settings.conf") as mock_conf,
):
mock_conf.getboolean.return_value = False
@@ -336,10 +330,10 @@ class TestMetadataEngineHooks:
@patch("airflow.settings.create_async_metadata_engine")
def test_configure_async_session_skips_when_no_async_conn(self,
mock_create_async_engine):
- """_configure_async_session() must not call the hook when
``sql_alchemy_conn_async`` is empty."""
+ """_configure_async_session() must not call the hook when
SQL_ALCHEMY_CONN_ASYNC is empty."""
from airflow import settings
- with patch("airflow.settings._AirflowSettings.sql_alchemy_conn_async",
""):
+ with patch("airflow.settings.SQL_ALCHEMY_CONN_ASYNC", ""):
settings._configure_async_session()
assert mock_create_async_engine.mock_calls == []
@@ -432,7 +426,7 @@ def test_sqlite_relative_path(value, expectation):
with (
patch("os.environ", {"_AIRFLOW_SKIP_DB_TESTS": "true"}),
- patch("airflow.settings._AirflowSettings.sql_alchemy_conn", value),
+ patch("airflow.settings.SQL_ALCHEMY_CONN", value),
patch("airflow.settings.Session"),
patch("airflow.settings.engine"),
):
diff --git a/airflow-core/tests/unit/core/test_sqlalchemy_config.py
b/airflow-core/tests/unit/core/test_sqlalchemy_config.py
index 15dbb5a1148..db211bab2c5 100644
--- a/airflow-core/tests/unit/core/test_sqlalchemy_config.py
+++ b/airflow-core/tests/unit/core/test_sqlalchemy_config.py
@@ -38,8 +38,8 @@ class TestSqlAlchemySettings:
try:
with pytest.MonkeyPatch.context() as mp:
mp.setattr(
- settings._AirflowSettings,
- "sql_alchemy_conn",
+ settings,
+ "SQL_ALCHEMY_CONN",
"mysql+foobar://user:pass@host/dbname?inline=param&another=param",
)
yield
@@ -56,7 +56,7 @@ class TestSqlAlchemySettings:
settings.configure_orm()
expected_kwargs = dict(
connect_args={}
- if not settings.get_sql_alchemy_conn().startswith("sqlite")
+ if not settings.SQL_ALCHEMY_CONN.startswith("sqlite")
else {"check_same_thread": False},
max_overflow=10,
pool_pre_ping=True,
@@ -66,7 +66,7 @@ class TestSqlAlchemySettings:
future=True,
)
mock_create_engine.assert_called_once_with(
- settings.get_sql_alchemy_conn(),
+ settings.SQL_ALCHEMY_CONN,
**expected_kwargs,
)
@@ -83,7 +83,7 @@ class TestSqlAlchemySettings:
monkeypatch,
):
"""SQLAlchemy 2.0+ uses QueuePool for file-based SQLite, so pool
settings should be applied."""
- monkeypatch.setattr(settings._AirflowSettings, "sql_alchemy_conn",
"sqlite:////tmp/airflow.db")
+ monkeypatch.setattr(settings, "SQL_ALCHEMY_CONN",
"sqlite:////tmp/airflow.db")
settings.configure_orm()
expected_kwargs = dict(
connect_args={"check_same_thread": False},
@@ -114,7 +114,7 @@ class TestSqlAlchemySettings:
)
def test_prepare_engine_args_sqlite_in_memory_skips_pool_settings(self,
conn_str, monkeypatch):
"""In-memory SQLite uses SingletonThreadPool which doesn't support
pool_size/max_overflow."""
- monkeypatch.setattr(settings._AirflowSettings, "sql_alchemy_conn",
conn_str)
+ monkeypatch.setattr(settings, "SQL_ALCHEMY_CONN", conn_str)
engine_args = settings.prepare_engine_args()
assert "pool_size" not in engine_args
assert "max_overflow" not in engine_args
@@ -139,7 +139,7 @@ class TestSqlAlchemySettings:
with conf_vars(config):
settings.configure_orm()
engine_args = {"arg": 1}
- if settings.get_sql_alchemy_conn().startswith("mysql"):
+ if settings.SQL_ALCHEMY_CONN.startswith("mysql"):
engine_args["isolation_level"] = "READ COMMITTED"
expected_kwargs = dict(
connect_args=SQL_ALCHEMY_CONNECT_ARGS,
@@ -148,7 +148,7 @@ class TestSqlAlchemySettings:
**engine_args,
)
mock_create_engine.assert_called_once_with(
- settings.get_sql_alchemy_conn(),
+ settings.SQL_ALCHEMY_CONN,
**expected_kwargs,
)
diff --git a/airflow-core/tests/unit/utils/test_db.py
b/airflow-core/tests/unit/utils/test_db.py
index 600f08a548e..d2f13e27e9e 100644
--- a/airflow-core/tests/unit/utils/test_db.py
+++ b/airflow-core/tests/unit/utils/test_db.py
@@ -406,9 +406,7 @@ class TestAutocommitEngineForMySQL:
def test_non_mysql_database_is_noop(self, mocker):
"""Test that non-MySQL databases don't trigger any changes."""
# Mock settings to use PostgreSQL
- mocker.patch.object(
- settings._AirflowSettings, "sql_alchemy_conn",
"postgresql://user:pass@localhost/db"
- )
+ mocker.patch.object(settings, "SQL_ALCHEMY_CONN",
"postgresql://user:pass@localhost/db")
mock_dispose = mocker.patch.object(settings, "dispose_orm")
mock_configure = mocker.patch.object(settings, "configure_orm")
@@ -430,9 +428,7 @@ class TestAutocommitEngineForMySQL:
def test_mysql_database_modifies_engine(self, mocker):
"""Test that MySQL databases trigger AUTOCOMMIT mode."""
# Mock settings to use MySQL
- mocker.patch.object(
- settings._AirflowSettings, "sql_alchemy_conn",
"mysql+mysqlconnector://user:pass@localhost/db"
- )
+ mocker.patch.object(settings, "SQL_ALCHEMY_CONN",
"mysql+mysqlconnector://user:pass@localhost/db")
mock_dispose = mocker.patch.object(settings, "dispose_orm")
mock_configure = mocker.patch.object(settings, "configure_orm")
@@ -474,7 +470,7 @@ class TestAutocommitEngineForMySQL:
]
for conn_string in mysql_variants:
- mocker.patch.object(settings._AirflowSettings, "sql_alchemy_conn",
conn_string)
+ mocker.patch.object(settings, "SQL_ALCHEMY_CONN", conn_string)
mock_dispose.reset_mock()
mock_configure.reset_mock()
@@ -489,21 +485,18 @@ class TestAutocommitEngineForMySQL:
def test_none_sql_alchemy_conn(self, mocker):
"""Test behavior when SQL_ALCHEMY_CONN is None."""
- mocker.patch.object(settings._AirflowSettings, "sql_alchemy_conn",
None)
+ mocker.patch.object(settings, "SQL_ALCHEMY_CONN", None)
mock_dispose = mocker.patch.object(settings, "dispose_orm")
mock_configure = mocker.patch.object(settings, "configure_orm")
- with pytest.raises(RuntimeError, match="connection string not
configured"):
- with AutocommitEngineForMySQL():
- pass
-
- # Failure should not trigger any ORM disposal or reconfiguration side
effects.
- mock_dispose.assert_not_called()
- mock_configure.assert_not_called()
+ with AutocommitEngineForMySQL():
+ # Should be a no-op when connection string is None
+ mock_dispose.assert_not_called()
+ mock_configure.assert_not_called()
def test_prepare_engine_args_with_parameters(self, mocker):
"""Test that prepare_engine_args parameters are properly forwarded."""
- mocker.patch.object(settings._AirflowSettings, "sql_alchemy_conn",
"mysql://user:pass@localhost/db")
+ mocker.patch.object(settings, "SQL_ALCHEMY_CONN",
"mysql://user:pass@localhost/db")
mocker.patch.object(settings, "dispose_orm")
mocker.patch.object(settings, "configure_orm")
@@ -534,7 +527,7 @@ class TestAutocommitEngineForMySQL:
def test_exception_during_context(self, mocker):
"""Test that cleanup happens even if an exception occurs."""
- mocker.patch.object(settings._AirflowSettings, "sql_alchemy_conn",
"mysql://user:pass@localhost/db")
+ mocker.patch.object(settings, "SQL_ALCHEMY_CONN",
"mysql://user:pass@localhost/db")
mock_dispose = mocker.patch.object(settings, "dispose_orm")
mock_configure = mocker.patch.object(settings, "configure_orm")
original_prepare = mocker.Mock()
@@ -557,7 +550,7 @@ class TestAutocommitEngineForMySQL:
def test_logging_messages(self, mocker):
"""Test that appropriate log messages are generated."""
- mocker.patch.object(settings._AirflowSettings, "sql_alchemy_conn",
"mysql://user:pass@localhost/db")
+ mocker.patch.object(settings, "SQL_ALCHEMY_CONN",
"mysql://user:pass@localhost/db")
mocker.patch.object(settings, "dispose_orm")
mocker.patch.object(settings, "configure_orm")
@@ -578,8 +571,7 @@ class TestAutocommitEngineForMySQL:
"""Test integration using a fully mocked settings module."""
# Setup mock settings module
mock_settings = mocker.Mock()
- mock_settings._AirflowSettings = mocker.Mock()
- mock_settings._AirflowSettings.sql_alchemy_conn =
"mysql://user:pass@localhost/db"
+ mock_settings.SQL_ALCHEMY_CONN = "mysql://user:pass@localhost/db"
mock_settings.dispose_orm = mocker.Mock()
mock_settings.configure_orm = mocker.Mock()
original_prepare = mocker.Mock(return_value={"test": "value"})
diff --git a/providers/fab/src/airflow/providers/fab/auth_manager/models/db.py
b/providers/fab/src/airflow/providers/fab/auth_manager/models/db.py
index a62cbcc409a..10b83612339 100644
--- a/providers/fab/src/airflow/providers/fab/auth_manager/models/db.py
+++ b/providers/fab/src/airflow/providers/fab/auth_manager/models/db.py
@@ -24,7 +24,6 @@ from sqlalchemy.engine.url import make_url
from airflow import settings
from airflow.providers.common.compat.sdk import AirflowException
from airflow.providers.fab.auth_manager.models import model_metadata
-from airflow.providers.fab.version_compat import AIRFLOW_V_3_3_PLUS
from airflow.utils.db import _offline_migration, print_happy_cat
from airflow.utils.db_manager import BaseDBManager
@@ -68,8 +67,7 @@ class FABDBManager(BaseDBManager):
def create_db_from_orm(self):
super().create_db_from_orm()
- sql_conn = settings.get_sql_alchemy_conn() if AIRFLOW_V_3_3_PLUS else
settings.SQL_ALCHEMY_CONN
- db, flask_app = _get_flask_db(sql_conn)
+ db, flask_app = _get_flask_db(settings.SQL_ALCHEMY_CONN)
with flask_app.app_context():
db.create_all()
@@ -129,6 +127,8 @@ class FABDBManager(BaseDBManager):
_release_metadata_locks_if_supported(self)
# alembic adds significant import time, so we import it lazily
+ if not settings.SQL_ALCHEMY_CONN:
+ raise RuntimeError("The settings.SQL_ALCHEMY_CONN not set. This is
a critical assertion.")
from alembic import command
current_revision = self.get_current_revision()
@@ -147,10 +147,7 @@ class FABDBManager(BaseDBManager):
config = self.get_alembic_config()
if show_sql_only:
- sql_conn: str = (
- settings.get_sql_alchemy_conn() if AIRFLOW_V_3_3_PLUS else
str(settings.SQL_ALCHEMY_CONN)
- )
- if make_url(sql_conn).get_backend_name() == "sqlite":
+ if make_url(settings.SQL_ALCHEMY_CONN).get_backend_name() ==
"sqlite":
raise SystemExit("Offline migration not supported for SQLite.")
if not from_revision:
from_revision = current_revision
@@ -175,6 +172,9 @@ class FABDBManager(BaseDBManager):
"downgrade from current revision."
)
+ if not settings.SQL_ALCHEMY_CONN:
+ raise RuntimeError("The settings.SQL_ALCHEMY_CONN not set.")
+
# alembic adds significant import time, so we import it lazily
from alembic import command
@@ -200,7 +200,6 @@ class FABDBManager(BaseDBManager):
def drop_tables(self, connection):
super().drop_tables(connection)
- sql_conn = settings.get_sql_alchemy_conn() if AIRFLOW_V_3_3_PLUS else
settings.SQL_ALCHEMY_CONN
- db, flask_app = _get_flask_db(sql_conn)
+ db, flask_app = _get_flask_db(settings.SQL_ALCHEMY_CONN)
with flask_app.app_context():
db.drop_all()
diff --git a/providers/fab/src/airflow/providers/fab/migrations/env.py
b/providers/fab/src/airflow/providers/fab/migrations/env.py
index 7a812c56e7a..903057ba602 100644
--- a/providers/fab/src/airflow/providers/fab/migrations/env.py
+++ b/providers/fab/src/airflow/providers/fab/migrations/env.py
@@ -24,7 +24,6 @@ from alembic import context
from airflow import settings
from airflow.providers.fab.auth_manager.models.db import FABDBManager
-from airflow.providers.fab.version_compat import AIRFLOW_V_3_3_PLUS
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
@@ -68,9 +67,8 @@ def run_migrations_offline():
script output.
"""
- url: str = settings.get_sql_alchemy_conn() if AIRFLOW_V_3_3_PLUS else
str(settings.SQL_ALCHEMY_CONN)
context.configure(
- url=url,
+ url=settings.SQL_ALCHEMY_CONN,
target_metadata=target_metadata,
literal_binds=True,
compare_type=True,
diff --git a/providers/fab/src/airflow/providers/fab/version_compat.py
b/providers/fab/src/airflow/providers/fab/version_compat.py
index 419ebf48568..92feca19762 100644
--- a/providers/fab/src/airflow/providers/fab/version_compat.py
+++ b/providers/fab/src/airflow/providers/fab/version_compat.py
@@ -36,4 +36,3 @@ AIRFLOW_V_3_1_PLUS = get_base_airflow_version_tuple() >= (3,
1, 0)
AIRFLOW_V_3_1_1_PLUS = get_base_airflow_version_tuple() >= (3, 1, 1)
AIRFLOW_V_3_1_8_PLUS = get_base_airflow_version_tuple() >= (3, 1, 8)
AIRFLOW_V_3_2_PLUS = get_base_airflow_version_tuple() >= (3, 2, 0)
-AIRFLOW_V_3_3_PLUS = get_base_airflow_version_tuple() >= (3, 3, 0)
diff --git a/providers/fab/tests/unit/fab/auth_manager/models/test_db.py
b/providers/fab/tests/unit/fab/auth_manager/models/test_db.py
index 69154acd5b4..452e5522fb7 100644
--- a/providers/fab/tests/unit/fab/auth_manager/models/test_db.py
+++ b/providers/fab/tests/unit/fab/auth_manager/models/test_db.py
@@ -32,8 +32,6 @@ from airflow.utils.db import (
compare_type,
)
-from tests_common.test_utils.config import conf_vars
-
pytestmark = [pytest.mark.db_test]
try:
from airflow.providers.fab.auth_manager.models.db import FABDBManager
@@ -111,11 +109,14 @@ try:
actual = mock_om.call_args.kwargs["revision"]
assert actual == "abc"
- @conf_vars({("database", "sql_alchemy_conn"):
"sqlite:////tmp/fab-test.db"})
@mock.patch.object(FABDBManager, "get_current_revision")
def test_sqlite_offline_upgrade_raises_with_revision(self, mock_gcr,
session):
- with pytest.raises(SystemExit, match="Offline migration not
supported for SQLite"):
- FABDBManager(session).upgradedb(from_revision=None,
to_revision=None, show_sql_only=True)
+ with mock.patch(
+
"airflow.providers.fab.auth_manager.models.db.settings.SQL_ALCHEMY_CONN",
+ "sqlite:////tmp/fab-test.db",
+ ):
+ with pytest.raises(SystemExit, match="Offline migration not
supported for SQLite"):
+ FABDBManager(session).upgradedb(from_revision=None,
to_revision=None, show_sql_only=True)
@mock.patch("alembic.command.upgrade")
@mock.patch.object(FABDBManager, "create_db_from_orm")
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index 2269f22420f..3e6236c5786 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -344,7 +344,6 @@ def block_orm_access():
conf.set("database", "sql_alchemy_conn", conn)
conf.set("database", "sql_alchemy_conn_cmd", "/bin/false")
conf.set("database", "sql_alchemy_conn_secret",
"db-access-blocked")
- settings.block_orm_access()
# This only gets called when the module does not already have an
attribute, and for these values
# lets give a custom error message
@@ -355,6 +354,9 @@ def block_orm_access():
settings.__getattr__ = __getattr__
+ settings.SQL_ALCHEMY_CONN = conn
+ settings.SQL_ALCHEMY_CONN_ASYNC = conn
+
os.environ["AIRFLOW__DATABASE__SQL_ALCHEMY_CONN"] = conn
os.environ["AIRFLOW__CORE__SQL_ALCHEMY_CONN"] = conn