amoghrajesh commented on code in PR #66463:
URL: https://github.com/apache/airflow/pull/66463#discussion_r3201551462


##########
airflow-core/src/airflow/state/metastore.py:
##########
@@ -252,6 +253,41 @@ def _clear_asset_state(self, scope: AssetScope, *, 
session: Session) -> None:
             )
         )
 
+    def cleanup(self) -> None:
+        """
+        Remove expired task state rows and orphaned asset state rows.
+
+        Reads ``[state_store] default_retention_days`` from config to 
determine the time-based
+        cutoff. Set to 0 to disable time-based cleanup (expires_at and orphan 
cleanup still run).
+
+        Two passes for task_state:
+        a. Rows where updated_at < now() - default_retention_days (global 
retention)
+        b. Rows where expires_at < now() (per-key early expiry set by the 
operator)
+
+        Asset state orphan cleanup: asset_state rows whose FK'd asset has no 
asset_active
+        entry are stale (asset removed from all DAGs) and are deleted here.
+        """
+        from datetime import timedelta
+
+        from airflow.configuration import conf
+
+        retention_days = conf.getint("state_store", "default_retention_days")
+        now = timezone.utcnow()
+        older_than = now - timedelta(days=retention_days) if retention_days > 
0 else None
+        with create_session() as session:
+            if older_than:
+                
session.execute(delete(TaskStateModel).where(TaskStateModel.updated_at < 
older_than))
+            session.execute(
+                delete(TaskStateModel).where(
+                    TaskStateModel.expires_at.isnot(None),
+                    TaskStateModel.expires_at < now,
+                )
+            )
+            active_asset_ids = select(AssetModel.id).join(
+                AssetActive, (AssetActive.name == AssetModel.name) & 
(AssetActive.uri == AssetModel.uri)
+            )
+            
session.execute(delete(AssetStateModel).where(AssetStateModel.asset_id.not_in(active_asset_ids)))
+

Review Comment:
   Handled it in: cdc423703e



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to