ashb commented on code in PR #66463:
URL: https://github.com/apache/airflow/pull/66463#discussion_r3201723567
##########
airflow-core/src/airflow/state/metastore.py:
##########
@@ -252,6 +259,84 @@ def _clear_asset_state(self, scope: AssetScope, *,
session: Session) -> None:
)
)
+ def cleanup(self) -> None:
+ """
+ Remove expired task state rows.
+
+ Reads ``[state_store] default_retention_days`` and ``[state_store]
state_cleanup_batch_size``
+ from config. Each pass runs in its own transaction so partial progress
is committed even if a
+ later pass fails. Each pass is batched to avoid long-running locks on
the table.
+
+ Two passes:
+ a. Rows where updated_at < now() - default_retention_days (global
retention)
+ b. Rows where expires_at < now() (per-key early expiry set by the
operator)
+ """
+ retention_days = conf.getint("state_store", "default_retention_days")
+ batch_size = conf.getint("state_store", "state_cleanup_batch_size")
+ now = timezone.utcnow()
+ older_than = now - timedelta(days=retention_days) if retention_days >
0 else None
+
+ pk_cols = (
+ TaskStateModel.dag_run_id,
+ TaskStateModel.task_id,
+ TaskStateModel.map_index,
+ TaskStateModel.key,
+ )
+
+ def _delete_batched(where_clause) -> int:
+ total = 0
+ while True:
+ with create_session() as session:
Review Comment:
I don't think this should be a new session object each time around the loop,
but instead one session object that is explicitly `session.commit()`ed after
each batch.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]