ashb commented on code in PR #68438:
URL: https://github.com/apache/airflow/pull/68438#discussion_r3403050321
##########
airflow-core/src/airflow/cli/commands/state_store_command.py:
##########
@@ -19,18 +19,18 @@
import logging
from airflow.state import get_state_backend
-from airflow.state.metastore import MetastoreStoreBackend
+from airflow.state.metastore import MetastoreStateStoreBackend
Review Comment:
So this one, I think could remain as `MetastoreBackend`, since we are
already in the `airflow.state` package.
##########
airflow-core/src/airflow/state/metastore.py:
##########
@@ -135,9 +141,9 @@ def set(
assert session is not None
match scope:
case TaskScope():
- self._set_task_store(scope, key, value, expires_at=expires_at,
session=session)
+ self._set_task_state_store(scope, key, value,
expires_at=expires_at, session=session)
Review Comment:
or
```suggestion
self._store_task_state(scope, key, value,
expires_at=expires_at, session=session)
```
##########
airflow-core/src/airflow/state/metastore.py:
##########
@@ -135,9 +141,9 @@ def set(
assert session is not None
match scope:
case TaskScope():
- self._set_task_store(scope, key, value, expires_at=expires_at,
session=session)
+ self._set_task_state_store(scope, key, value,
expires_at=expires_at, session=session)
Review Comment:
```suggestion
self._set_task_state(scope, key, value,
expires_at=expires_at, session=session)
```
We are definitionally in the state store already, and we aren't setting the
state store, but setting/storing state.
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -86,7 +86,7 @@
TaskInletAssetReference,
TaskOutletAssetReference,
)
-from airflow.models.asset_store import AssetStoreModel
+from airflow.models.asset_state_store import AssetStateStoreModel
Review Comment:
Nit: why is the scheduler importing this directly rather than going via the
store backend?
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -3545,7 +3545,7 @@ def _update_asset_orphanage(self, *, session: Session =
NEW_SESSION) -> None:
self._orphan_unreferenced_assets(orphan_query, session=session)
self._activate_referenced_assets(activate_query, session=session)
- self._cleanup_orphaned_asset_store(session=session)
+ self._cleanup_orphaned_asset_state_store(session=session)
Review Comment:
nit/observation: there is no cleanup of orphaned task state store entries --
should there be?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]