Lee-W commented on code in PR #66463:
URL: https://github.com/apache/airflow/pull/66463#discussion_r3232978829


##########
airflow-core/src/airflow/state/metastore.py:
##########
@@ -252,6 +272,51 @@ def _clear_asset_state(self, scope: AssetScope, *, 
session: Session) -> None:
             )
         )
 
+    def cleanup(self) -> None:
+        """
+        Remove expired task state rows.
+
+        ``expires_at`` is set at write time on every ``set()`` call, so 
cleanup is a single
+        ``WHERE expires_at < now()`` pass. Rows with ``expires_at=NULL`` 
(default_retention_days=0)
+        are never deleted. Batching is configurable via ``[state_store] 
state_cleanup_batch_size``.
+        """
+        batch_size = conf.getint("state_store", "state_cleanup_batch_size")
+        now = timezone.utcnow()
+
+        def _delete_batched(where_clause) -> int:
+            total = 0
+            with create_session() as session:
+                while True:
+                    id_query = select(TaskStateModel.id).where(where_clause)
+                    if batch_size > 0:
+                        id_query = id_query.limit(batch_size)
+                    ids = session.scalars(id_query).all()
+                    if not ids:
+                        break
+                    
session.execute(delete(TaskStateModel).where(TaskStateModel.id.in_(ids)))
+                    session.commit()
+                    total += len(ids)
+                    if batch_size <= 0 or len(ids) < batch_size:
+                        break
+            return total
+
+        deleted = _delete_batched(TaskStateModel.expires_at < now)
+        log.info("Deleted expired task_state rows", rows_deleted=deleted)
+
+    def _summary_dry_run_(self) -> dict[str, list]:

Review Comment:
   ```suggestion
       def _summary_dry_run(self) -> dict[str, list]:
   ```
   
   typo? 



##########
shared/state/src/airflow_shared/state/__init__.py:
##########
@@ -122,3 +122,14 @@ async def aclear(self, scope: StateScope, *, 
all_map_indices: bool = False) -> N
         scope are cleared. Pass ``all_map_indices=True`` to wipe state across 
every
         mapped instance of the task. For ``AssetScope`` the flag has no effect.
         """
+
+    def cleanup(self) -> None:
+        """
+        Remove expired and orphaned state records.
+
+        This is a no-op by default. Custom backends override this to implement 
their own
+        retention policy. The backend is responsible for reading any relevant 
config (e.g.
+        ``[state_store] default_retention_days``) and deciding what to delete.
+        Airflow does not call this from any standard job — the scheduler 
triggers it via
+        ``call_regular_interval`` for the default backend.

Review Comment:
   do we still have this



##########
shared/state/src/airflow_shared/state/__init__.py:
##########
@@ -122,3 +122,14 @@ async def aclear(self, scope: StateScope, *, 
all_map_indices: bool = False) -> N
         scope are cleared. Pass ``all_map_indices=True`` to wipe state across 
every
         mapped instance of the task. For ``AssetScope`` the flag has no effect.
         """
+
+    def cleanup(self) -> None:
+        """
+        Remove expired and orphaned state records.
+
+        This is a no-op by default. Custom backends override this to implement 
their own
+        retention policy. The backend is responsible for reading any relevant 
config (e.g.
+        ``[state_store] default_retention_days``) and deciding what to delete.
+        Airflow does not call this from any standard job — the scheduler 
triggers it via
+        ``call_regular_interval`` for the default backend.

Review Comment:
   i remember we have something in the scheduler in the previous review. did we 
remove it? or is it in another PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to