ashb commented on code in PR #68438:
URL: https://github.com/apache/airflow/pull/68438#discussion_r3403050321


##########
airflow-core/src/airflow/cli/commands/state_store_command.py:
##########
@@ -19,18 +19,18 @@
 import logging
 
 from airflow.state import get_state_backend
-from airflow.state.metastore import MetastoreStoreBackend
+from airflow.state.metastore import MetastoreStateStoreBackend

Review Comment:
   So this one, I think could remain as `MetastoreBackend`, since we are 
already in the `airflow.state` package.



##########
airflow-core/src/airflow/state/metastore.py:
##########
@@ -135,9 +141,9 @@ def set(
             assert session is not None
         match scope:
             case TaskScope():
-                self._set_task_store(scope, key, value, expires_at=expires_at, 
session=session)
+                self._set_task_state_store(scope, key, value, 
expires_at=expires_at, session=session)

Review Comment:
   or
   ```suggestion
                   self._store_task_state(scope, key, value, 
expires_at=expires_at, session=session)
   ```



##########
airflow-core/src/airflow/state/metastore.py:
##########
@@ -135,9 +141,9 @@ def set(
             assert session is not None
         match scope:
             case TaskScope():
-                self._set_task_store(scope, key, value, expires_at=expires_at, 
session=session)
+                self._set_task_state_store(scope, key, value, 
expires_at=expires_at, session=session)

Review Comment:
   ```suggestion
                   self._set_task_state(scope, key, value, 
expires_at=expires_at, session=session)
   ```
   
   We are definitionally in the state store already, and we aren't setting the 
state store, but setting/storing state.



##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -86,7 +86,7 @@
     TaskInletAssetReference,
     TaskOutletAssetReference,
 )
-from airflow.models.asset_store import AssetStoreModel
+from airflow.models.asset_state_store import AssetStateStoreModel

Review Comment:
   Nit: why is the scheduler importing this directly rather than going via the 
store backend?



##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -3545,7 +3545,7 @@ def _update_asset_orphanage(self, *, session: Session = 
NEW_SESSION) -> None:
 
         self._orphan_unreferenced_assets(orphan_query, session=session)
         self._activate_referenced_assets(activate_query, session=session)
-        self._cleanup_orphaned_asset_store(session=session)
+        self._cleanup_orphaned_asset_state_store(session=session)

Review Comment:
   nit/observation: there is no cleanup of orphaned task state store entries -- 
should there be?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to