This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new cf709d929a0 Remove global variables in airflow.settings (#67070)
cf709d929a0 is described below
commit cf709d929a0faf22420f6a207da31eb5d729483f
Author: Jens Scheffler <[email protected]>
AuthorDate: Mon May 18 00:57:54 2026 +0200
Remove global variables in airflow.settings (#67070)
---
airflow-core/src/airflow/migrations/env.py | 2 +-
airflow-core/src/airflow/settings.py | 94 ++++++++++++++++------
airflow-core/src/airflow/utils/db.py | 11 +--
airflow-core/src/airflow/utils/db_manager.py | 2 +-
airflow-core/tests/unit/core/test_settings.py | 20 +++--
.../tests/unit/core/test_sqlalchemy_config.py | 16 ++--
airflow-core/tests/unit/utils/test_db.py | 32 +++++---
.../providers/fab/auth_manager/models/db.py | 17 ++--
.../src/airflow/providers/fab/migrations/env.py | 4 +-
.../src/airflow/providers/fab/version_compat.py | 1 +
.../tests/unit/fab/auth_manager/models/test_db.py | 11 ++-
.../src/airflow/sdk/execution_time/supervisor.py | 4 +-
12 files changed, 135 insertions(+), 79 deletions(-)
diff --git a/airflow-core/src/airflow/migrations/env.py
b/airflow-core/src/airflow/migrations/env.py
index dfb46ab6c37..2f4f8307cc8 100644
--- a/airflow-core/src/airflow/migrations/env.py
+++ b/airflow-core/src/airflow/migrations/env.py
@@ -80,7 +80,7 @@ def run_migrations_offline():
"""
context.configure(
- url=settings.SQL_ALCHEMY_CONN,
+ url=settings.get_sql_alchemy_conn(),
target_metadata=target_metadata,
literal_binds=True,
compare_type=compare_type,
diff --git a/airflow-core/src/airflow/settings.py
b/airflow-core/src/airflow/settings.py
index 739e2437f6a..a824227b23b 100644
--- a/airflow-core/src/airflow/settings.py
+++ b/airflow-core/src/airflow/settings.py
@@ -110,8 +110,15 @@ HEADER = "\n".join(
SIMPLE_LOG_FORMAT = conf.get("logging", "simple_log_format")
-SQL_ALCHEMY_CONN: str | None = None
-SQL_ALCHEMY_CONN_ASYNC: str | None = None
+
+class _AirflowSettings:
+ """Class to hold Airflow settings. This is used to avoid circular imports
and to allow for lazy loading of settings."""
+
+ block_orm_access: bool = False
+ sql_alchemy_conn: str | None = None
+ sql_alchemy_conn_async: str | None = None
+
+
PLUGINS_FOLDER: str | None = None
DAGS_FOLDER: str = os.path.expanduser(conf.get_mandatory_value("core",
"DAGS_FOLDER"))
@@ -126,7 +133,25 @@ async_engine: AsyncEngine | None = None
AsyncSession: Callable[..., SAAsyncSession] | None = None
-def get_engine():
+def get_sql_alchemy_conn() -> str:
+ """Get the configured SQLAlchemy connection string, raising an error if
not configured."""
+ if _AirflowSettings.block_orm_access:
+ raise AttributeError("Access to the Airflow Metadatabase from dags is
not allowed!")
+ if _AirflowSettings.sql_alchemy_conn is None:
+ raise RuntimeError("SQLAlchemy connection string not configured. Call
configure_vars() first.")
+ return _AirflowSettings.sql_alchemy_conn
+
+
+def get_sql_alchemy_conn_async() -> str:
+ """Get the configured SQLAlchemy connection string, raising an error if
not configured."""
+ if _AirflowSettings.block_orm_access:
+ raise AttributeError("Access to the Airflow Metadatabase from dags is
not allowed!")
+ if _AirflowSettings.sql_alchemy_conn_async is None:
+ raise RuntimeError("SQLAlchemy async connection string not configured.
Call configure_vars() first.")
+ return _AirflowSettings.sql_alchemy_conn_async
+
+
+def get_engine() -> Engine:
"""Get the configured engine, raising an error if not configured."""
if engine is None:
raise RuntimeError("Engine not configured. Call configure_orm()
first.")
@@ -140,6 +165,11 @@ def get_session():
return Session
+def block_orm_access():
+ """Block access to the ORM by marking state."""
+ _AirflowSettings.block_orm_access = True
+
+
# The JSON library to use for DAG Serialization and De-Serialization
json = json_lib
@@ -250,16 +280,16 @@ def _get_async_conn_uri_from_sync(sync_uri):
def configure_vars():
"""Configure Global Variables from airflow.cfg."""
- global SQL_ALCHEMY_CONN
- global SQL_ALCHEMY_CONN_ASYNC
global DAGS_FOLDER
global PLUGINS_FOLDER
- SQL_ALCHEMY_CONN = conf.get("database", "sql_alchemy_conn")
+ _AirflowSettings.sql_alchemy_conn = conf.get("database",
"sql_alchemy_conn")
if conf.has_option("database", "sql_alchemy_conn_async"):
- SQL_ALCHEMY_CONN_ASYNC = conf.get("database", "sql_alchemy_conn_async")
+ _AirflowSettings.sql_alchemy_conn_async = conf.get("database",
"sql_alchemy_conn_async")
else:
- SQL_ALCHEMY_CONN_ASYNC =
_get_async_conn_uri_from_sync(sync_uri=SQL_ALCHEMY_CONN)
+ _AirflowSettings.sql_alchemy_conn_async =
_get_async_conn_uri_from_sync(
+ sync_uri=_AirflowSettings.sql_alchemy_conn
+ )
DAGS_FOLDER = os.path.expanduser(conf.get("core", "DAGS_FOLDER"))
@@ -402,7 +432,8 @@ def _configure_async_session() -> None:
"""
global AsyncSession, async_engine
- if not SQL_ALCHEMY_CONN_ASYNC:
+ async_conn = _AirflowSettings.sql_alchemy_conn_async
+ if not async_conn:
async_engine = None
AsyncSession = None
return
@@ -414,14 +445,14 @@ def _configure_async_session() -> None:
engine_args: dict[str, Any] = {}
if not conf.getboolean("database", "SQL_ALCHEMY_POOL_ENABLED"):
engine_args["poolclass"] = NullPool
- elif not SQL_ALCHEMY_CONN_ASYNC.startswith("sqlite"):
+ elif not async_conn.startswith("sqlite"):
engine_args["pool_size"] = conf.getint("database",
"SQL_ALCHEMY_POOL_SIZE", fallback=5)
engine_args["pool_recycle"] = conf.getint("database",
"SQL_ALCHEMY_POOL_RECYCLE", fallback=1800)
engine_args["pool_pre_ping"] = conf.getboolean("database",
"SQL_ALCHEMY_POOL_PRE_PING", fallback=True)
engine_args["max_overflow"] = conf.getint("database",
"SQL_ALCHEMY_MAX_OVERFLOW", fallback=10)
async_engine = create_async_metadata_engine(
- SQL_ALCHEMY_CONN_ASYNC,
+ async_conn,
connect_args=_get_connect_args("async"),
engine_args=engine_args,
)
@@ -437,11 +468,12 @@ def configure_orm(disable_connection_pool=False,
pool_class=None):
"""Configure ORM using SQLAlchemy."""
from airflow._shared.secrets_masker import mask_secret
- if _is_sqlite_db_path_relative(SQL_ALCHEMY_CONN):
+ conn_str = get_sql_alchemy_conn()
+ if _is_sqlite_db_path_relative(conn_str):
from airflow.exceptions import AirflowConfigException
raise AirflowConfigException(
- f"Cannot use relative path: `{SQL_ALCHEMY_CONN}` to connect to
sqlite. "
+ f"Cannot use relative path: `{conn_str}` to connect to sqlite. "
"Please use absolute path such as `sqlite:////tmp/airflow.db`."
)
@@ -458,17 +490,13 @@ def configure_orm(disable_connection_pool=False,
pool_class=None):
engine_args = prepare_engine_args(disable_connection_pool, pool_class)
connect_args = _get_connect_args("sync")
- if SQL_ALCHEMY_CONN.startswith("sqlite"):
+ if conn_str.startswith("sqlite"):
# FastAPI runs sync endpoints in a separate thread. SQLite does not
allow
# to use objects created in another threads by default. Allowing that
in test
# to so the `test` thread and the tested endpoints can use common
objects.
connect_args["check_same_thread"] = False
- engine = create_metadata_engine(
- SQL_ALCHEMY_CONN,
- engine_args=engine_args,
- connect_args=connect_args,
- )
+ engine = create_metadata_engine(conn_str, engine_args=engine_args,
connect_args=connect_args)
_configure_async_session()
mask_secret(engine.url.password)
setup_event_handlers(engine)
@@ -530,8 +558,9 @@ def prepare_engine_args(disable_connection_pool=False,
pool_class=None):
}
default_args = {}
+ conn_str = get_sql_alchemy_conn()
for dialect, default in DEFAULT_ENGINE_ARGS.items():
- if SQL_ALCHEMY_CONN.startswith(dialect):
+ if conn_str.startswith(dialect):
default_args = default.copy()
break
@@ -543,7 +572,7 @@ def prepare_engine_args(disable_connection_pool=False,
pool_class=None):
elif disable_connection_pool or not conf.getboolean("database",
"SQL_ALCHEMY_POOL_ENABLED"):
engine_args["poolclass"] = NullPool
log.debug("settings.prepare_engine_args(): Using NullPool")
- elif _is_sqlite_in_memory(SQL_ALCHEMY_CONN):
+ elif _is_sqlite_in_memory(conn_str):
# In-memory SQLite uses SingletonThreadPool which doesn't support
pool_size/max_overflow.
log.debug("settings.prepare_engine_args(): Skipping pool settings for
in-memory SQLite")
else:
@@ -596,7 +625,7 @@ def prepare_engine_args(disable_connection_pool=False,
pool_class=None):
# 'READ COMMITTED' is the default value for PostgreSQL.
# More information here:
#
https://dev.mysql.com/doc/refman/8.0/en/innodb-transaction-isolation-levels.html
- if SQL_ALCHEMY_CONN.startswith("mysql"):
+ if conn_str.startswith("mysql"):
engine_args["isolation_level"] = "READ COMMITTED"
return engine_args
@@ -645,12 +674,13 @@ def configure_adapters():
"""Register Adapters and DB Converters."""
from pendulum import DateTime as Pendulum
- if SQL_ALCHEMY_CONN.startswith("sqlite"):
+ conn_str = get_sql_alchemy_conn()
+ if conn_str.startswith("sqlite"):
from sqlite3 import register_adapter
register_adapter(Pendulum, lambda val: val.isoformat(" "))
- if SQL_ALCHEMY_CONN.startswith("mysql"):
+ if conn_str.startswith("mysql"):
try:
try:
import MySQLdb.converters
@@ -727,6 +757,22 @@ def __getattr__(name: str):
from airflow.exceptions import RemovedInAirflow4Warning
+ if name == "SQL_ALCHEMY_CONN":
+ warnings.warn(
+ "settings.SQL_ALCHEMY_CONN has been replaced by
get_sql_alchemy_conn(). This shim is just for compatibility. "
+ "Please upgrade your provider or integration.",
+ RemovedInAirflow4Warning,
+ stacklevel=2,
+ )
+ return get_sql_alchemy_conn()
+ if name == "SQL_ALCHEMY_CONN_ASYNC":
+ warnings.warn(
+ "settings.SQL_ALCHEMY_CONN_ASYNC has been replaced by
get_sql_alchemy_conn_async(). "
+ "This shim is just for compatibility. Please upgrade your provider
or integration.",
+ RemovedInAirflow4Warning,
+ stacklevel=2,
+ )
+ return get_sql_alchemy_conn_async()
if name == "MASK_SECRETS_IN_LOGS":
warnings.warn(
"settings.MASK_SECRETS_IN_LOGS has been removed. This shim returns
default value of False. "
diff --git a/airflow-core/src/airflow/utils/db.py
b/airflow-core/src/airflow/utils/db.py
index 4caa9901bfb..e611be90d0b 100644
--- a/airflow-core/src/airflow/utils/db.py
+++ b/airflow-core/src/airflow/utils/db.py
@@ -654,7 +654,8 @@ class AutocommitEngineForMySQL:
"""
def __init__(self):
- self.is_mysql = settings.SQL_ALCHEMY_CONN and
settings.SQL_ALCHEMY_CONN.lower().startswith("mysql")
+ conn_str = settings.get_sql_alchemy_conn()
+ self.is_mysql = conn_str and conn_str.lower().startswith("mysql")
self.original_prepare_engine_args = None
def __enter__(self):
@@ -879,7 +880,7 @@ def _get_alembic_config():
else:
config = Config(os.path.join(package_dir, alembic_file))
config.set_main_option("script_location", directory.replace("%", "%%"))
- config.set_main_option("sqlalchemy.url",
settings.SQL_ALCHEMY_CONN.replace("%", "%%"))
+ config.set_main_option("sqlalchemy.url",
settings.get_sql_alchemy_conn().replace("%", "%%"))
return config
@@ -1244,9 +1245,6 @@ def upgradedb(
if from_revision and not show_sql_only:
raise AirflowException("`from_revision` only supported with
`sql_only=True`.")
- if not settings.SQL_ALCHEMY_CONN:
- raise RuntimeError("The settings.SQL_ALCHEMY_CONN not set. This is a
critical assertion.")
-
from alembic import command
import_all_models()
@@ -1383,9 +1381,6 @@ def downgrade(*, to_revision, from_revision=None,
show_sql_only=False, session:
"downgrade from current revision."
)
- if not settings.SQL_ALCHEMY_CONN:
- raise RuntimeError("The settings.SQL_ALCHEMY_CONN not set.")
-
# alembic adds significant import time, so we import it lazily
from alembic import command
diff --git a/airflow-core/src/airflow/utils/db_manager.py
b/airflow-core/src/airflow/utils/db_manager.py
index 57173341e88..8aa76bd131b 100644
--- a/airflow-core/src/airflow/utils/db_manager.py
+++ b/airflow-core/src/airflow/utils/db_manager.py
@@ -100,7 +100,7 @@ class BaseDBManager(LoggingMixin):
config = Config(self.alembic_file)
config.set_main_option("script_location",
self.migration_dir.replace("%", "%%"))
- config.set_main_option("sqlalchemy.url",
settings.SQL_ALCHEMY_CONN.replace("%", "%%"))
+ config.set_main_option("sqlalchemy.url",
settings.get_sql_alchemy_conn().replace("%", "%%"))
return config
def get_script_object(self, config=None) -> ScriptDirectory:
diff --git a/airflow-core/tests/unit/core/test_settings.py
b/airflow-core/tests/unit/core/test_settings.py
index 588567181c5..0126e9a4f40 100644
--- a/airflow-core/tests/unit/core/test_settings.py
+++ b/airflow-core/tests/unit/core/test_settings.py
@@ -243,7 +243,7 @@ class TestMetadataEngineHooks:
with (
patch("os.environ", {"_AIRFLOW_SKIP_DB_TESTS": "false"}),
- patch("airflow.settings.SQL_ALCHEMY_CONN", "sqlite://"),
+ patch("airflow.settings._AirflowSettings.sql_alchemy_conn",
"sqlite://"),
patch("airflow.settings.Session"),
patch("airflow.settings.engine"),
patch("airflow.settings.setup_event_handlers"),
@@ -269,7 +269,7 @@ class TestMetadataEngineHooks:
mock_create_async_engine.return_value = MagicMock()
with (
- patch("airflow.settings.SQL_ALCHEMY_CONN_ASYNC",
"sqlite+aiosqlite://"),
+ patch("airflow.settings._AirflowSettings.sql_alchemy_conn_async",
"sqlite+aiosqlite://"),
patch("airflow.settings.conf") as mock_conf,
):
# Pool enabled but sqlite -- pool args should be skipped
@@ -291,7 +291,10 @@ class TestMetadataEngineHooks:
mock_create_async_engine.return_value = MagicMock()
with (
- patch("airflow.settings.SQL_ALCHEMY_CONN_ASYNC",
"postgresql+asyncpg://localhost/airflow"),
+ patch(
+ "airflow.settings._AirflowSettings.sql_alchemy_conn_async",
+ "postgresql+asyncpg://localhost/airflow",
+ ),
patch("airflow.settings.conf") as mock_conf,
):
mock_conf.getint.side_effect = lambda section, key, fallback=None:
{
@@ -317,7 +320,10 @@ class TestMetadataEngineHooks:
mock_create_async_engine.return_value = MagicMock()
with (
- patch("airflow.settings.SQL_ALCHEMY_CONN_ASYNC",
"postgresql+asyncpg://localhost/airflow"),
+ patch(
+ "airflow.settings._AirflowSettings.sql_alchemy_conn_async",
+ "postgresql+asyncpg://localhost/airflow",
+ ),
patch("airflow.settings.conf") as mock_conf,
):
mock_conf.getboolean.return_value = False
@@ -330,10 +336,10 @@ class TestMetadataEngineHooks:
@patch("airflow.settings.create_async_metadata_engine")
def test_configure_async_session_skips_when_no_async_conn(self,
mock_create_async_engine):
- """_configure_async_session() must not call the hook when
SQL_ALCHEMY_CONN_ASYNC is empty."""
+ """_configure_async_session() must not call the hook when
``sql_alchemy_conn_async`` is empty."""
from airflow import settings
- with patch("airflow.settings.SQL_ALCHEMY_CONN_ASYNC", ""):
+ with patch("airflow.settings._AirflowSettings.sql_alchemy_conn_async",
""):
settings._configure_async_session()
assert mock_create_async_engine.mock_calls == []
@@ -426,7 +432,7 @@ def test_sqlite_relative_path(value, expectation):
with (
patch("os.environ", {"_AIRFLOW_SKIP_DB_TESTS": "true"}),
- patch("airflow.settings.SQL_ALCHEMY_CONN", value),
+ patch("airflow.settings._AirflowSettings.sql_alchemy_conn", value),
patch("airflow.settings.Session"),
patch("airflow.settings.engine"),
):
diff --git a/airflow-core/tests/unit/core/test_sqlalchemy_config.py
b/airflow-core/tests/unit/core/test_sqlalchemy_config.py
index db211bab2c5..15dbb5a1148 100644
--- a/airflow-core/tests/unit/core/test_sqlalchemy_config.py
+++ b/airflow-core/tests/unit/core/test_sqlalchemy_config.py
@@ -38,8 +38,8 @@ class TestSqlAlchemySettings:
try:
with pytest.MonkeyPatch.context() as mp:
mp.setattr(
- settings,
- "SQL_ALCHEMY_CONN",
+ settings._AirflowSettings,
+ "sql_alchemy_conn",
"mysql+foobar://user:pass@host/dbname?inline=param&another=param",
)
yield
@@ -56,7 +56,7 @@ class TestSqlAlchemySettings:
settings.configure_orm()
expected_kwargs = dict(
connect_args={}
- if not settings.SQL_ALCHEMY_CONN.startswith("sqlite")
+ if not settings.get_sql_alchemy_conn().startswith("sqlite")
else {"check_same_thread": False},
max_overflow=10,
pool_pre_ping=True,
@@ -66,7 +66,7 @@ class TestSqlAlchemySettings:
future=True,
)
mock_create_engine.assert_called_once_with(
- settings.SQL_ALCHEMY_CONN,
+ settings.get_sql_alchemy_conn(),
**expected_kwargs,
)
@@ -83,7 +83,7 @@ class TestSqlAlchemySettings:
monkeypatch,
):
"""SQLAlchemy 2.0+ uses QueuePool for file-based SQLite, so pool
settings should be applied."""
- monkeypatch.setattr(settings, "SQL_ALCHEMY_CONN",
"sqlite:////tmp/airflow.db")
+ monkeypatch.setattr(settings._AirflowSettings, "sql_alchemy_conn",
"sqlite:////tmp/airflow.db")
settings.configure_orm()
expected_kwargs = dict(
connect_args={"check_same_thread": False},
@@ -114,7 +114,7 @@ class TestSqlAlchemySettings:
)
def test_prepare_engine_args_sqlite_in_memory_skips_pool_settings(self,
conn_str, monkeypatch):
"""In-memory SQLite uses SingletonThreadPool which doesn't support
pool_size/max_overflow."""
- monkeypatch.setattr(settings, "SQL_ALCHEMY_CONN", conn_str)
+ monkeypatch.setattr(settings._AirflowSettings, "sql_alchemy_conn",
conn_str)
engine_args = settings.prepare_engine_args()
assert "pool_size" not in engine_args
assert "max_overflow" not in engine_args
@@ -139,7 +139,7 @@ class TestSqlAlchemySettings:
with conf_vars(config):
settings.configure_orm()
engine_args = {"arg": 1}
- if settings.SQL_ALCHEMY_CONN.startswith("mysql"):
+ if settings.get_sql_alchemy_conn().startswith("mysql"):
engine_args["isolation_level"] = "READ COMMITTED"
expected_kwargs = dict(
connect_args=SQL_ALCHEMY_CONNECT_ARGS,
@@ -148,7 +148,7 @@ class TestSqlAlchemySettings:
**engine_args,
)
mock_create_engine.assert_called_once_with(
- settings.SQL_ALCHEMY_CONN,
+ settings.get_sql_alchemy_conn(),
**expected_kwargs,
)
diff --git a/airflow-core/tests/unit/utils/test_db.py
b/airflow-core/tests/unit/utils/test_db.py
index d2f13e27e9e..600f08a548e 100644
--- a/airflow-core/tests/unit/utils/test_db.py
+++ b/airflow-core/tests/unit/utils/test_db.py
@@ -406,7 +406,9 @@ class TestAutocommitEngineForMySQL:
def test_non_mysql_database_is_noop(self, mocker):
"""Test that non-MySQL databases don't trigger any changes."""
# Mock settings to use PostgreSQL
- mocker.patch.object(settings, "SQL_ALCHEMY_CONN",
"postgresql://user:pass@localhost/db")
+ mocker.patch.object(
+ settings._AirflowSettings, "sql_alchemy_conn",
"postgresql://user:pass@localhost/db"
+ )
mock_dispose = mocker.patch.object(settings, "dispose_orm")
mock_configure = mocker.patch.object(settings, "configure_orm")
@@ -428,7 +430,9 @@ class TestAutocommitEngineForMySQL:
def test_mysql_database_modifies_engine(self, mocker):
"""Test that MySQL databases trigger AUTOCOMMIT mode."""
# Mock settings to use MySQL
- mocker.patch.object(settings, "SQL_ALCHEMY_CONN",
"mysql+mysqlconnector://user:pass@localhost/db")
+ mocker.patch.object(
+ settings._AirflowSettings, "sql_alchemy_conn",
"mysql+mysqlconnector://user:pass@localhost/db"
+ )
mock_dispose = mocker.patch.object(settings, "dispose_orm")
mock_configure = mocker.patch.object(settings, "configure_orm")
@@ -470,7 +474,7 @@ class TestAutocommitEngineForMySQL:
]
for conn_string in mysql_variants:
- mocker.patch.object(settings, "SQL_ALCHEMY_CONN", conn_string)
+ mocker.patch.object(settings._AirflowSettings, "sql_alchemy_conn",
conn_string)
mock_dispose.reset_mock()
mock_configure.reset_mock()
@@ -485,18 +489,21 @@ class TestAutocommitEngineForMySQL:
def test_none_sql_alchemy_conn(self, mocker):
"""Test behavior when SQL_ALCHEMY_CONN is None."""
- mocker.patch.object(settings, "SQL_ALCHEMY_CONN", None)
+ mocker.patch.object(settings._AirflowSettings, "sql_alchemy_conn",
None)
mock_dispose = mocker.patch.object(settings, "dispose_orm")
mock_configure = mocker.patch.object(settings, "configure_orm")
- with AutocommitEngineForMySQL():
- # Should be a no-op when connection string is None
- mock_dispose.assert_not_called()
- mock_configure.assert_not_called()
+ with pytest.raises(RuntimeError, match="connection string not
configured"):
+ with AutocommitEngineForMySQL():
+ pass
+
+ # Failure should not trigger any ORM disposal or reconfiguration side
effects.
+ mock_dispose.assert_not_called()
+ mock_configure.assert_not_called()
def test_prepare_engine_args_with_parameters(self, mocker):
"""Test that prepare_engine_args parameters are properly forwarded."""
- mocker.patch.object(settings, "SQL_ALCHEMY_CONN",
"mysql://user:pass@localhost/db")
+ mocker.patch.object(settings._AirflowSettings, "sql_alchemy_conn",
"mysql://user:pass@localhost/db")
mocker.patch.object(settings, "dispose_orm")
mocker.patch.object(settings, "configure_orm")
@@ -527,7 +534,7 @@ class TestAutocommitEngineForMySQL:
def test_exception_during_context(self, mocker):
"""Test that cleanup happens even if an exception occurs."""
- mocker.patch.object(settings, "SQL_ALCHEMY_CONN",
"mysql://user:pass@localhost/db")
+ mocker.patch.object(settings._AirflowSettings, "sql_alchemy_conn",
"mysql://user:pass@localhost/db")
mock_dispose = mocker.patch.object(settings, "dispose_orm")
mock_configure = mocker.patch.object(settings, "configure_orm")
original_prepare = mocker.Mock()
@@ -550,7 +557,7 @@ class TestAutocommitEngineForMySQL:
def test_logging_messages(self, mocker):
"""Test that appropriate log messages are generated."""
- mocker.patch.object(settings, "SQL_ALCHEMY_CONN",
"mysql://user:pass@localhost/db")
+ mocker.patch.object(settings._AirflowSettings, "sql_alchemy_conn",
"mysql://user:pass@localhost/db")
mocker.patch.object(settings, "dispose_orm")
mocker.patch.object(settings, "configure_orm")
@@ -571,7 +578,8 @@ class TestAutocommitEngineForMySQL:
"""Test integration using a fully mocked settings module."""
# Setup mock settings module
mock_settings = mocker.Mock()
- mock_settings.SQL_ALCHEMY_CONN = "mysql://user:pass@localhost/db"
+ mock_settings._AirflowSettings = mocker.Mock()
+ mock_settings._AirflowSettings.sql_alchemy_conn =
"mysql://user:pass@localhost/db"
mock_settings.dispose_orm = mocker.Mock()
mock_settings.configure_orm = mocker.Mock()
original_prepare = mocker.Mock(return_value={"test": "value"})
diff --git a/providers/fab/src/airflow/providers/fab/auth_manager/models/db.py
b/providers/fab/src/airflow/providers/fab/auth_manager/models/db.py
index 10b83612339..a62cbcc409a 100644
--- a/providers/fab/src/airflow/providers/fab/auth_manager/models/db.py
+++ b/providers/fab/src/airflow/providers/fab/auth_manager/models/db.py
@@ -24,6 +24,7 @@ from sqlalchemy.engine.url import make_url
from airflow import settings
from airflow.providers.common.compat.sdk import AirflowException
from airflow.providers.fab.auth_manager.models import model_metadata
+from airflow.providers.fab.version_compat import AIRFLOW_V_3_3_PLUS
from airflow.utils.db import _offline_migration, print_happy_cat
from airflow.utils.db_manager import BaseDBManager
@@ -67,7 +68,8 @@ class FABDBManager(BaseDBManager):
def create_db_from_orm(self):
super().create_db_from_orm()
- db, flask_app = _get_flask_db(settings.SQL_ALCHEMY_CONN)
+ sql_conn = settings.get_sql_alchemy_conn() if AIRFLOW_V_3_3_PLUS else
settings.SQL_ALCHEMY_CONN
+ db, flask_app = _get_flask_db(sql_conn)
with flask_app.app_context():
db.create_all()
@@ -127,8 +129,6 @@ class FABDBManager(BaseDBManager):
_release_metadata_locks_if_supported(self)
# alembic adds significant import time, so we import it lazily
- if not settings.SQL_ALCHEMY_CONN:
- raise RuntimeError("The settings.SQL_ALCHEMY_CONN not set. This is
a critical assertion.")
from alembic import command
current_revision = self.get_current_revision()
@@ -147,7 +147,10 @@ class FABDBManager(BaseDBManager):
config = self.get_alembic_config()
if show_sql_only:
- if make_url(settings.SQL_ALCHEMY_CONN).get_backend_name() ==
"sqlite":
+ sql_conn: str = (
+ settings.get_sql_alchemy_conn() if AIRFLOW_V_3_3_PLUS else
str(settings.SQL_ALCHEMY_CONN)
+ )
+ if make_url(sql_conn).get_backend_name() == "sqlite":
raise SystemExit("Offline migration not supported for SQLite.")
if not from_revision:
from_revision = current_revision
@@ -172,9 +175,6 @@ class FABDBManager(BaseDBManager):
"downgrade from current revision."
)
- if not settings.SQL_ALCHEMY_CONN:
- raise RuntimeError("The settings.SQL_ALCHEMY_CONN not set.")
-
# alembic adds significant import time, so we import it lazily
from alembic import command
@@ -200,6 +200,7 @@ class FABDBManager(BaseDBManager):
def drop_tables(self, connection):
super().drop_tables(connection)
- db, flask_app = _get_flask_db(settings.SQL_ALCHEMY_CONN)
+ sql_conn = settings.get_sql_alchemy_conn() if AIRFLOW_V_3_3_PLUS else
settings.SQL_ALCHEMY_CONN
+ db, flask_app = _get_flask_db(sql_conn)
with flask_app.app_context():
db.drop_all()
diff --git a/providers/fab/src/airflow/providers/fab/migrations/env.py
b/providers/fab/src/airflow/providers/fab/migrations/env.py
index 903057ba602..7a812c56e7a 100644
--- a/providers/fab/src/airflow/providers/fab/migrations/env.py
+++ b/providers/fab/src/airflow/providers/fab/migrations/env.py
@@ -24,6 +24,7 @@ from alembic import context
from airflow import settings
from airflow.providers.fab.auth_manager.models.db import FABDBManager
+from airflow.providers.fab.version_compat import AIRFLOW_V_3_3_PLUS
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
@@ -67,8 +68,9 @@ def run_migrations_offline():
script output.
"""
+ url: str = settings.get_sql_alchemy_conn() if AIRFLOW_V_3_3_PLUS else
str(settings.SQL_ALCHEMY_CONN)
context.configure(
- url=settings.SQL_ALCHEMY_CONN,
+ url=url,
target_metadata=target_metadata,
literal_binds=True,
compare_type=True,
diff --git a/providers/fab/src/airflow/providers/fab/version_compat.py
b/providers/fab/src/airflow/providers/fab/version_compat.py
index 92feca19762..419ebf48568 100644
--- a/providers/fab/src/airflow/providers/fab/version_compat.py
+++ b/providers/fab/src/airflow/providers/fab/version_compat.py
@@ -36,3 +36,4 @@ AIRFLOW_V_3_1_PLUS = get_base_airflow_version_tuple() >= (3,
1, 0)
AIRFLOW_V_3_1_1_PLUS = get_base_airflow_version_tuple() >= (3, 1, 1)
AIRFLOW_V_3_1_8_PLUS = get_base_airflow_version_tuple() >= (3, 1, 8)
AIRFLOW_V_3_2_PLUS = get_base_airflow_version_tuple() >= (3, 2, 0)
+AIRFLOW_V_3_3_PLUS = get_base_airflow_version_tuple() >= (3, 3, 0)
diff --git a/providers/fab/tests/unit/fab/auth_manager/models/test_db.py
b/providers/fab/tests/unit/fab/auth_manager/models/test_db.py
index 452e5522fb7..69154acd5b4 100644
--- a/providers/fab/tests/unit/fab/auth_manager/models/test_db.py
+++ b/providers/fab/tests/unit/fab/auth_manager/models/test_db.py
@@ -32,6 +32,8 @@ from airflow.utils.db import (
compare_type,
)
+from tests_common.test_utils.config import conf_vars
+
pytestmark = [pytest.mark.db_test]
try:
from airflow.providers.fab.auth_manager.models.db import FABDBManager
@@ -109,14 +111,11 @@ try:
actual = mock_om.call_args.kwargs["revision"]
assert actual == "abc"
+ @conf_vars({("database", "sql_alchemy_conn"):
"sqlite:////tmp/fab-test.db"})
@mock.patch.object(FABDBManager, "get_current_revision")
def test_sqlite_offline_upgrade_raises_with_revision(self, mock_gcr,
session):
- with mock.patch(
-
"airflow.providers.fab.auth_manager.models.db.settings.SQL_ALCHEMY_CONN",
- "sqlite:////tmp/fab-test.db",
- ):
- with pytest.raises(SystemExit, match="Offline migration not
supported for SQLite"):
- FABDBManager(session).upgradedb(from_revision=None,
to_revision=None, show_sql_only=True)
+ with pytest.raises(SystemExit, match="Offline migration not
supported for SQLite"):
+ FABDBManager(session).upgradedb(from_revision=None,
to_revision=None, show_sql_only=True)
@mock.patch("alembic.command.upgrade")
@mock.patch.object(FABDBManager, "create_db_from_orm")
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index 3e6236c5786..2269f22420f 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -344,6 +344,7 @@ def block_orm_access():
conf.set("database", "sql_alchemy_conn", conn)
conf.set("database", "sql_alchemy_conn_cmd", "/bin/false")
conf.set("database", "sql_alchemy_conn_secret",
"db-access-blocked")
+ settings.block_orm_access()
# This only gets called when the module does not already have an
attribute, and for these values
# lets give a custom error message
@@ -354,9 +355,6 @@ def block_orm_access():
settings.__getattr__ = __getattr__
- settings.SQL_ALCHEMY_CONN = conn
- settings.SQL_ALCHEMY_CONN_ASYNC = conn
-
os.environ["AIRFLOW__DATABASE__SQL_ALCHEMY_CONN"] = conn
os.environ["AIRFLOW__CORE__SQL_ALCHEMY_CONN"] = conn