This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 74d0c476224 Refactor db_cleanup.py to use SQLA2 (#60377)
74d0c476224 is described below
commit 74d0c476224b81f01287384362b08ac496e6b92d
Author: Justin Pakzad <[email protected]>
AuthorDate: Thu Jan 15 10:23:15 2026 -0500
Refactor db_cleanup.py to use SQLA2 (#60377)
---
.pre-commit-config.yaml | 1 +
airflow-core/src/airflow/utils/db_cleanup.py | 34 ++++++++++++++++------------
2 files changed, 21 insertions(+), 14 deletions(-)
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 51ad605648c..5d872584977 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -433,6 +433,7 @@ repos:
(?x)
^airflow-ctl.*\.py$|
^airflow-core/src/airflow/models/.*\.py$|
+ ^airflow-core/src/airflow/utils/db_cleanup.py$|
^airflow-core/src/airflow/migrations/versions/0015_2_9_0_update_trigger_kwargs_type.py$|
^airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py$|
^airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py$|
diff --git a/airflow-core/src/airflow/utils/db_cleanup.py
b/airflow-core/src/airflow/utils/db_cleanup.py
index 8b6ce79a772..025303ce8f2 100644
--- a/airflow-core/src/airflow/utils/db_cleanup.py
+++ b/airflow-core/src/airflow/utils/db_cleanup.py
@@ -47,7 +47,8 @@ from airflow.utils.types import DagRunType
if TYPE_CHECKING:
from pendulum import DateTime
- from sqlalchemy.orm import Query, Session
+ from sqlalchemy import Select
+ from sqlalchemy.orm import Session
from airflow.models import Base
@@ -183,8 +184,8 @@ if (
config_dict: dict[str, _TableConfig] = {x.orm_model.name: x for x in
sorted(config_list)}
-def _check_for_rows(*, query: Query, print_rows: bool = False) -> int:
- num_entities = query.count()
+def _check_for_rows(*, session: Session, query: Select, print_rows: bool =
False) -> int:
+ num_entities =
session.scalars(select(func.count()).select_from(query.subquery())).one()
print(f"Found {num_entities} rows meeting deletion criteria.")
if not print_rows or num_entities == 0:
return num_entities
@@ -192,7 +193,7 @@ def _check_for_rows(*, query: Query, print_rows: bool =
False) -> int:
max_rows_to_print = 100
print(f"Printing first {max_rows_to_print} rows.")
logger.debug("print entities query: %s", query)
- for entry in query.limit(max_rows_to_print):
+ for entry in session.execute(query.limit(max_rows_to_print)):
print(entry.__dict__)
return num_entities
@@ -213,7 +214,7 @@ def _dump_table_to_file(*, target_table: str, file_path:
str, export_format: str
def _do_delete(
- *, query: Query, orm_model: Base, skip_archive: bool, session: Session,
batch_size: int | None
+ *, query: Select, orm_model: Base, skip_archive: bool, session: Session,
batch_size: int | None
) -> None:
import itertools
import re
@@ -224,7 +225,9 @@ def _do_delete(
while True:
limited_query = query.limit(batch_size) if batch_size else query
- if limited_query.count() == 0: # nothing left to delete
+ if (
+
session.scalars(select(func.count()).select_from(limited_query.subquery())).one()
== 0
+ ): # nothing left to delete
break
batch_no = next(batch_counter)
@@ -290,13 +293,17 @@ def _do_delete(
def _subquery_keep_last(
- *, recency_column, keep_last_filters, group_by_columns, max_date_colname,
session: Session
+ *,
+ recency_column,
+ keep_last_filters,
+ group_by_columns,
+ max_date_colname,
):
subquery = select(*group_by_columns,
func.max(recency_column).label(max_date_colname))
if keep_last_filters is not None:
for entry in keep_last_filters:
- subquery = subquery.filter(entry)
+ subquery = subquery.where(entry)
if group_by_columns is not None:
subquery = subquery.group_by(*group_by_columns)
@@ -332,10 +339,10 @@ def _build_query(
dag_ids: list[str] | None = None,
exclude_dag_ids: list[str] | None = None,
**kwargs,
-) -> Query:
+) -> Select:
base_table_alias = "base"
base_table = aliased(orm_model, name=base_table_alias)
- query =
session.query(base_table).with_entities(text(f"{base_table_alias}.*"))
+ query = select(text(f"{base_table_alias}.*")).select_from(base_table)
base_table_recency_col = base_table.c[recency_column.name]
conditions = [base_table_recency_col < clean_before_timestamp]
@@ -355,9 +362,8 @@ def _build_query(
keep_last_filters=keep_last_filters,
group_by_columns=group_by_columns,
max_date_colname=max_date_col_name,
- session=session,
)
- query = query.select_from(base_table).outerjoin(
+ query = query.outerjoin(
subquery,
and_(
*[base_table.c[x] == subquery.c[x] for x in
keep_last_group_by], # type: ignore[attr-defined]
@@ -365,7 +371,7 @@ def _build_query(
),
)
conditions.append(column(max_date_col_name).is_(None))
- query = query.filter(and_(*conditions))
+ query = query.where(and_(*conditions))
return query
@@ -404,7 +410,7 @@ def _cleanup_table(
)
logger.debug("old rows query:\n%s", query.selectable.compile())
print(f"Checking table {orm_model.name}")
- num_rows = _check_for_rows(query=query, print_rows=False)
+ num_rows = _check_for_rows(session=session, query=query, print_rows=False)
if num_rows and not dry_run:
_do_delete(