This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new fdb49e13dab Remove findings from positional session check in Core
Utils (#67777)
fdb49e13dab is described below
commit fdb49e13dab85e5415955ca2c117f522153144c8
Author: Jens Scheffler <[email protected]>
AuthorDate: Sun May 31 00:18:46 2026 +0200
Remove findings from positional session check in Core Utils (#67777)
* Fix exceptions of positional session use in airflow-core utils
* Fix fab migration
---
.../api_fastapi/core_api/routes/public/connections.py | 2 +-
airflow-core/src/airflow/utils/cli_action_loggers.py | 1 +
airflow-core/src/airflow/utils/db.py | 16 ++++++++--------
airflow-core/src/airflow/utils/db_cleanup.py | 3 ++-
airflow-core/src/airflow/utils/log/file_task_handler.py | 2 +-
devel-common/src/tests_common/test_utils/db.py | 4 ++--
.../tests/unit/fab/auth_manager/test_fab_auth_manager.py | 2 +-
scripts/ci/prek/known_provide_session_positional.txt | 4 ----
8 files changed, 16 insertions(+), 18 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/connections.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/connections.py
index 2bf8f99e059..5bab2204548 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/connections.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/connections.py
@@ -268,4 +268,4 @@ def create_default_connections(
session: SessionDep,
):
"""Create default connections."""
- db_create_default_connections(session)
+ db_create_default_connections(session=session)
diff --git a/airflow-core/src/airflow/utils/cli_action_loggers.py
b/airflow-core/src/airflow/utils/cli_action_loggers.py
index 6ec28079af5..632430e8a93 100644
--- a/airflow-core/src/airflow/utils/cli_action_loggers.py
+++ b/airflow-core/src/airflow/utils/cli_action_loggers.py
@@ -112,6 +112,7 @@ def default_action_log(
logical_date,
host_name,
full_command,
+ *,
session: Session = NEW_SESSION,
**_,
):
diff --git a/airflow-core/src/airflow/utils/db.py
b/airflow-core/src/airflow/utils/db.py
index 0d3bf5ef1aa..791e3041da7 100644
--- a/airflow-core/src/airflow/utils/db.py
+++ b/airflow-core/src/airflow/utils/db.py
@@ -173,7 +173,7 @@ def timeout_with_traceback(seconds, message="Operation
timed out"):
@provide_session
-def merge_conn(conn: Connection, session: Session = NEW_SESSION):
+def merge_conn(conn: Connection, *, session: Session = NEW_SESSION):
"""Add new Connection."""
if not session.scalar(select(1).where(conn.__class__.conn_id ==
conn.conn_id)):
session.add(conn)
@@ -181,7 +181,7 @@ def merge_conn(conn: Connection, session: Session =
NEW_SESSION):
@provide_session
-def add_default_pool_if_not_exists(session: Session = NEW_SESSION):
+def add_default_pool_if_not_exists(*, session: Session = NEW_SESSION):
"""Add default pool if it does not exist."""
from airflow.models.pool import Pool
@@ -197,12 +197,12 @@ def add_default_pool_if_not_exists(session: Session =
NEW_SESSION):
@provide_session
-def create_default_connections(session: Session = NEW_SESSION):
+def create_default_connections(*, session: Session = NEW_SESSION):
"""Create default Airflow connections."""
conns = get_default_connections()
for c in conns:
- merge_conn(c, session)
+ merge_conn(c, session=session)
def get_default_connections():
@@ -836,7 +836,7 @@ def _single_connection_pool() -> Generator[None, None,
None]:
@provide_session
-def initdb(session: Session = NEW_SESSION, use_migration_files: bool = False):
+def initdb(*, session: Session = NEW_SESSION, use_migration_files: bool =
False):
"""
Initialize Airflow database.
@@ -1104,7 +1104,7 @@ def reflect_tables(tables: list[MappedClassProtocol |
str] | None, session):
@provide_session
-def _check_migration_errors(session: Session = NEW_SESSION) -> Iterable[str]:
+def _check_migration_errors(*, session: Session = NEW_SESSION) ->
Iterable[str]:
""":session: session of the sqlalchemy."""
check_functions: Iterable[Callable[..., Iterable[str]]] = ()
for check_fn in check_functions:
@@ -1334,7 +1334,7 @@ def _resetdb_default(session: Session) -> None:
@provide_session
-def resetdb(session: Session = NEW_SESSION, skip_init: bool = False,
use_migration_files: bool = False):
+def resetdb(*, session: Session = NEW_SESSION, skip_init: bool = False,
use_migration_files: bool = False):
"""
Clear out the database.
@@ -1521,7 +1521,7 @@ def drop_airflow_moved_tables(connection):
@provide_session
-def check(session: Session = NEW_SESSION):
+def check(*, session: Session = NEW_SESSION):
"""
Check if the database works.
diff --git a/airflow-core/src/airflow/utils/db_cleanup.py
b/airflow-core/src/airflow/utils/db_cleanup.py
index 0c605b8d6bd..f3c8400efc7 100644
--- a/airflow-core/src/airflow/utils/db_cleanup.py
+++ b/airflow-core/src/airflow/utils/db_cleanup.py
@@ -654,6 +654,7 @@ def export_archived_records(
table_names: list[str] | None = None,
drop_archives: bool = False,
needs_confirm: bool = True,
+ *,
session: Session = NEW_SESSION,
) -> None:
"""Export archived data to the given output path in the given format."""
@@ -682,7 +683,7 @@ def export_archived_records(
@provide_session
def drop_archived_tables(
- table_names: list[str] | None, needs_confirm: bool, session: Session =
NEW_SESSION
+ table_names: list[str] | None, needs_confirm: bool, *, session: Session =
NEW_SESSION
) -> None:
"""Drop archived tables."""
archived_table_names = _get_archived_table_names(table_names, session)
diff --git a/airflow-core/src/airflow/utils/log/file_task_handler.py
b/airflow-core/src/airflow/utils/log/file_task_handler.py
index e2b6bcb6165..8a94b513b41 100644
--- a/airflow-core/src/airflow/utils/log/file_task_handler.py
+++ b/airflow-core/src/airflow/utils/log/file_task_handler.py
@@ -521,7 +521,7 @@ class FileTaskHandler(logging.Handler):
@provide_session
def _render_filename(
- self, ti: TaskInstance | TaskInstanceHistory, try_number: int,
session=NEW_SESSION
+ self, ti: TaskInstance | TaskInstanceHistory, try_number: int, *,
session=NEW_SESSION
) -> str:
"""Return the worker log filename."""
dag_run = ti.get_dagrun(session=session)
diff --git a/devel-common/src/tests_common/test_utils/db.py
b/devel-common/src/tests_common/test_utils/db.py
index cbfb0b377ae..bf8a7b6d59f 100644
--- a/devel-common/src/tests_common/test_utils/db.py
+++ b/devel-common/src/tests_common/test_utils/db.py
@@ -331,7 +331,7 @@ def clear_db_serialized_dags():
def clear_db_pools():
with create_session() as session:
session.execute(delete(Pool))
- add_default_pool_if_not_exists(session)
+ add_default_pool_if_not_exists(session=session)
def clear_test_connections(add_default_connections_back=True):
@@ -350,7 +350,7 @@ def clear_db_connections(add_default_connections_back=True):
with create_session() as session:
session.execute(delete(Connection))
if add_default_connections_back:
- create_default_connections(session)
+ create_default_connections(session=session)
@_retry_db
diff --git a/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py
b/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py
index 6b253207079..28c95a77273 100644
--- a/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py
+++ b/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py
@@ -1035,7 +1035,7 @@ def test_resetdb(
mock_connect = mock_engine.connect.return_value
session_mock = MagicMock()
- resetdb(session_mock, skip_init=skip_init)
+ resetdb(session=session_mock, skip_init=skip_init)
# In the non-MySQL path, drop functions are called with the raw connection
mock_drop_airflow.assert_called_once_with(mock_connect)
diff --git a/scripts/ci/prek/known_provide_session_positional.txt
b/scripts/ci/prek/known_provide_session_positional.txt
index 63ae75089a3..d2ba9508cae 100644
--- a/scripts/ci/prek/known_provide_session_positional.txt
+++ b/scripts/ci/prek/known_provide_session_positional.txt
@@ -20,10 +20,6 @@ airflow-core/src/airflow/models/trigger.py::7
airflow-core/src/airflow/models/variable.py::2
airflow-core/src/airflow/secrets/metastore.py::2
airflow-core/src/airflow/serialization/definitions/dag.py::2
-airflow-core/src/airflow/utils/cli_action_loggers.py::1
-airflow-core/src/airflow/utils/db.py::7
-airflow-core/src/airflow/utils/db_cleanup.py::2
-airflow-core/src/airflow/utils/log/file_task_handler.py::1
airflow-core/tests/unit/jobs/test_scheduler_job.py::1
airflow-core/tests/unit/listeners/test_listeners.py::7
airflow-core/tests/unit/models/test_taskinstance.py::4