This is an automated email from the ASF dual-hosted git repository.
amoghrajesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 439105f33b3 Add `task_store` table to `airflow db clean` mechanism
(#68218)
439105f33b3 is described below
commit 439105f33b3644865c4a58b591ea9c16b4281a7c
Author: Amogh Desai <[email protected]>
AuthorDate: Thu Jun 11 10:54:37 2026 +0530
Add `task_store` table to `airflow db clean` mechanism (#68218)
---
airflow-core/src/airflow/utils/db_cleanup.py | 7 +-
airflow-core/tests/unit/utils/test_db_cleanup.py | 108 +++++++++++++++++++++--
2 files changed, 105 insertions(+), 10 deletions(-)
diff --git a/airflow-core/src/airflow/utils/db_cleanup.py
b/airflow-core/src/airflow/utils/db_cleanup.py
index 8730a883a12..f0a6c2fe01a 100644
--- a/airflow-core/src/airflow/utils/db_cleanup.py
+++ b/airflow-core/src/airflow/utils/db_cleanup.py
@@ -140,7 +140,7 @@ config_list: list[_TableConfig] = [
keep_last=True,
keep_last_filters=[column("run_type") != DagRunType.MANUAL],
keep_last_group_by=["dag_id"],
- dependent_tables=["task_instance", "deadline"],
+ dependent_tables=["task_instance", "task_store", "deadline"],
),
_TableConfig(table_name="asset_event", recency_column_name="timestamp",
dag_id_column_name="dag_id"),
_TableConfig(table_name="import_error", recency_column_name="timestamp"),
@@ -155,6 +155,11 @@ config_list: list[_TableConfig] = [
_TableConfig(
table_name="task_instance_history", recency_column_name="start_date",
dag_id_column_name="dag_id"
),
+ _TableConfig(
+ table_name="task_store",
+ recency_column_name="expires_at",
+ dag_id_column_name="dag_id",
+ ),
_TableConfig(table_name="task_reschedule",
recency_column_name="start_date", dag_id_column_name="dag_id"),
_TableConfig(table_name="xcom", recency_column_name="timestamp",
dag_id_column_name="dag_id"),
_TableConfig(table_name="_xcom_archive", recency_column_name="timestamp",
dag_id_column_name="dag_id"),
diff --git a/airflow-core/tests/unit/utils/test_db_cleanup.py
b/airflow-core/tests/unit/utils/test_db_cleanup.py
index a2fd47c6853..d7b81db63b3 100644
--- a/airflow-core/tests/unit/utils/test_db_cleanup.py
+++ b/airflow-core/tests/unit/utils/test_db_cleanup.py
@@ -37,6 +37,7 @@ from airflow.models import DagModel, DagRun, TaskInstance
from airflow.models.dag_version import DagVersion
from airflow.models.dagbundle import DagBundleModel
from airflow.models.serialized_dag import SerializedDagModel
+from airflow.models.task_store import TaskStoreModel
from airflow.providers.standard.operators.python import PythonOperator
from airflow.serialization.serialized_objects import LazyDeserializedDAG
from airflow.utils.db_cleanup import (
@@ -878,6 +879,94 @@ def create_tis(base_date, num_tis,
run_type=DagRunType.SCHEDULED):
session.commit()
[email protected]_test
+class TestTaskStoreCleanup:
+ def test_expired_rows_deleted(self):
+ cfg = config_dict["task_store"]
+ now = pendulum.now(tz="UTC")
+ past = now.subtract(days=30)
+ future = now.add(days=30)
+
+ with create_session() as session:
+ bundle = DagBundleModel(name="ts_test_bundle")
+ session.add(bundle)
+ session.flush()
+
+ dag = DAG(dag_id="ts_test_dag")
+ dm = DagModel(dag_id="ts_test_dag", bundle_name="ts_test_bundle")
+ session.add(dm)
+ SerializedDagModel.write_dag(LazyDeserializedDAG.from_dag(dag),
bundle_name="ts_test_bundle")
+
+ dag_run = DagRun(
+ "ts_test_dag",
+ run_id="ts_test_run",
+ run_type=DagRunType.SCHEDULED,
+ start_date=past,
+ )
+ session.add(dag_run)
+ session.flush()
+
+ expired = TaskStoreModel(
+ dag_run_id=dag_run.id,
+ task_id="t1",
+ map_index=-1,
+ key="job_id",
+ dag_id="ts_test_dag",
+ run_id="ts_test_run",
+ value="job-expired",
+ updated_at=past,
+ expires_at=past.subtract(days=1),
+ )
+ never_expire = TaskStoreModel(
+ dag_run_id=dag_run.id,
+ task_id="t1",
+ map_index=-1,
+ key="result",
+ dag_id="ts_test_dag",
+ run_id="ts_test_run",
+ value="job-never-expire",
+ updated_at=past,
+ expires_at=None,
+ )
+ not_yet_expired = TaskStoreModel(
+ dag_run_id=dag_run.id,
+ task_id="t1",
+ map_index=-1,
+ key="future_key",
+ dag_id="ts_test_dag",
+ run_id="ts_test_run",
+ value="job-future",
+ updated_at=past,
+ expires_at=future,
+ )
+ session.add_all([expired, never_expire, not_yet_expired])
+ session.commit()
+
+ cutoff = now.subtract(hours=1)
+ with create_session() as session:
+ _cleanup_table(
+ **cfg.__dict__,
+ clean_before_timestamp=cutoff,
+ dry_run=False,
+ verbose=False,
+ confirm=False,
+ skip_archive=True,
+ session=session,
+ )
+
+ with create_session() as session:
+ not_deleted = {
+ row.key
+ for row in session.scalars(
+ select(TaskStoreModel).where(TaskStoreModel.dag_id ==
"ts_test_dag")
+ ).all()
+ }
+
+ assert "job_id" not in not_deleted, "expired row should be deleted"
+ assert "result" in not_deleted, "NEVER_EXPIRE row (expires_at=NULL)
must survive"
+ assert "future_key" in not_deleted, "not-yet-expired row must survive"
+
+
@pytest.mark.db_test
class TestConnectionTestRequestCleanup:
"""Verify db_cleanup never deletes in-flight connection tests (kaxil
r3169602754)."""
@@ -917,15 +1006,16 @@ class TestConnectionTestRequestCleanup:
# Run cleanup with a cutoff well past every seeded row.
cutoff = pendulum.now(tz="UTC").subtract(days=1)
- _cleanup_table(
- **cfg.__dict__,
- clean_before_timestamp=cutoff,
- dry_run=False,
- verbose=False,
- confirm=False,
- skip_archive=True,
- session=create_session().__enter__(),
- )
+ with create_session() as session:
+ _cleanup_table(
+ **cfg.__dict__,
+ clean_before_timestamp=cutoff,
+ dry_run=False,
+ verbose=False,
+ confirm=False,
+ skip_archive=True,
+ session=session,
+ )
with create_session() as s:
survivors = {