jason810496 commented on code in PR #66463:
URL: https://github.com/apache/airflow/pull/66463#discussion_r3198954290


##########
airflow-core/src/airflow/state/metastore.py:
##########
@@ -252,6 +253,41 @@ def _clear_asset_state(self, scope: AssetScope, *, 
session: Session) -> None:
             )
         )
 
+    def cleanup(self) -> None:
+        """
+        Remove expired task state rows and orphaned asset state rows.
+
+        Reads ``[state_store] default_retention_days`` from config to 
determine the time-based
+        cutoff. Set to 0 to disable time-based cleanup (expires_at and orphan 
cleanup still run).
+
+        Two passes for task_state:
+        a. Rows where updated_at < now() - default_retention_days (global 
retention)
+        b. Rows where expires_at < now() (per-key early expiry set by the 
operator)
+
+        Asset state orphan cleanup: asset_state rows whose FK'd asset has no 
asset_active
+        entry are stale (asset removed from all DAGs) and are deleted here.
+        """
+        from datetime import timedelta
+
+        from airflow.configuration import conf
+
+        retention_days = conf.getint("state_store", "default_retention_days")
+        now = timezone.utcnow()
+        older_than = now - timedelta(days=retention_days) if retention_days > 
0 else None
+        with create_session() as session:
+            if older_than:
+                
session.execute(delete(TaskStateModel).where(TaskStateModel.updated_at < 
older_than))
+            session.execute(
+                delete(TaskStateModel).where(
+                    TaskStateModel.expires_at.isnot(None),
+                    TaskStateModel.expires_at < now,
+                )
+            )
+            active_asset_ids = select(AssetModel.id).join(
+                AssetActive, (AssetActive.name == AssetModel.name) & 
(AssetActive.uri == AssetModel.uri)
+            )
+            
session.execute(delete(AssetStateModel).where(AssetStateModel.asset_id.not_in(active_asset_ids)))
+

Review Comment:
   If it's a valid assumption that users might produce a large amount of state 
records here between the `state_cleanup_interval` time window. I have some 
concern regarding the single pass delete Transaction here.
   
   I just double checked the concern with Claude:
   
   ---
   
   Yes, this is a real problem. Several compounding issues:
   
   1. Missing indexes — every cleanup will be a full table scan
   
   The two predicates the cleanup filters on are not indexed:
   - task_state.updated_at — no index (only task_state_pkey on (dag_run_id, 
   task_id, map_index, key) and idx_task_state_lookup on (dag_id, run_id, 
   task_id, map_index))
   - task_state.expires_at — no index (just added in this PR)
   
   So both DELETE WHERE updated_at < cutoff and DELETE WHERE expires_at < now()
   do full sequential scans. On a deployment with millions of rows that's 
minutes
   of scanning every 24h, plus the locks held for the whole duration.
   
   2. No batching / no LIMIT
   
   Compare to airflow db cleanup (utils/db_cleanup.py:217), which deletes in
   configurable batches and commits between them. The new path runs three plain
   bulk DELETEs in a single session. Long-running bulk DELETE means:
   - Row locks held for the duration (writers calling task_state.set() upserts 
on
   matching rows block — they queue behind the cleanup transaction).
   - On Postgres: massive WAL churn, autovacuum can't keep up, table bloat.
   - On MySQL/InnoDB at REPEATABLE READ (Airflow's default): next-key/gap locks
   make conflicts even more likely.
   
   3. All three DELETEs share one transaction
   
   with create_session() as session: opens one session; each session.execute()
   runs inside it; commit happens at exit. If pass 1 takes 90s, the locks from
   pass 1 are held while pass 2 and pass 3 run. A failure in pass 3 rolls back
   passes 1 and 2 (cleanup makes no forward progress at all).
   
   4. Scheduler main loop is blocked
   
   _cleanup_expired_task_state is registered via call_regular_interval, which is
   synchronous in the scheduler loop. Same pattern as
   _remove_unreferenced_triggers and _update_asset_orphanage — but those have
   small cardinality. task_state is user-driven and unbounded (the AIP 
encourages
   users to write a lot of it). With a multi-minute cleanup the scheduler is not
   scheduling for those minutes.
     



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to