amoghrajesh commented on code in PR #66463:
URL: https://github.com/apache/airflow/pull/66463#discussion_r3224009523
##########
airflow-core/src/airflow/state/metastore.py:
##########
@@ -252,6 +272,51 @@ def _clear_asset_state(self, scope: AssetScope, *,
session: Session) -> None:
)
)
+ def cleanup(self) -> None:
+ """
+ Remove expired task state rows.
+
+ ``expires_at`` is set at write time on every ``set()`` call, so
cleanup is a single
+ ``WHERE expires_at < now()`` pass. Rows with ``expires_at=NULL``
(default_retention_days=0)
+ are never deleted. Batching is configurable via ``[state_store]
state_cleanup_batch_size``.
+ """
+ batch_size = conf.getint("state_store", "state_cleanup_batch_size")
+ now = timezone.utcnow()
+
+ def _delete_batched(where_clause) -> int:
+ total = 0
+ with create_session() as session:
+ while True:
+ id_query = select(TaskStateModel.id).where(where_clause)
+ if batch_size > 0:
+ id_query = id_query.limit(batch_size)
+ ids = session.scalars(id_query).all()
+ if not ids:
+ break
+
session.execute(delete(TaskStateModel).where(TaskStateModel.id.in_(ids)))
+ session.commit()
+ total += len(ids)
+ if batch_size <= 0 or len(ids) < batch_size:
+ break
+ return total
+
+ deleted = _delete_batched(TaskStateModel.expires_at < now)
Review Comment:
A second pass would be something like `WHERE expires_at IS NULL AND
updated_at < now - default_retention_days`, to catch rows that were written
when `default_retention_days=0` but the config was later raised.
Something like:
```python
# Pass 1: code right now
deleted_expired = _delete_batched(TaskStateModel.expires_at < now)
# Pass 2: rows with NULL expires_at that are stale under the current global
default
if default_retention_days > 0:
cutoff = now - timedelta(days=default_retention_days)
deleted_stale = _delete_batched(
TaskStateModel.expires_at.is_(None) & (TaskStateModel.updated_at <
cutoff)
)
```
It would run in the same `airflow state-store cleanup` command
But on thinking more, I do not think that it is needed. `expires_at=NULL` is
an explicit signal — either `default_retention_days=0` was set, or
`retention_days=0` was passed at write time. Both mean "keep this row forever."
Retroactively deleting them on a config change would violate what was promised
at write time.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]