amoghrajesh commented on code in PR #66859:
URL: https://github.com/apache/airflow/pull/66859#discussion_r3265144422
##########
shared/state/src/airflow_shared/state/__init__.py:
##########
@@ -186,3 +193,44 @@ def cleanup(self) -> None:
retention policy. The backend is responsible for reading any relevant
config (e.g.
``[state_store] default_retention_days``) and deciding what to delete.
"""
+
+ def serialize_task_state_value(self, *, value: str, key: str, ti_id: str)
-> str:
+ """
+ Serialize a task state value before it is sent to the execution API
for db persistence.
+
+ Called by ``TaskStateAccessor.set()`` on the worker. The return value
is what gets
+ stored in the DB — typically a reference path (e.g. an S3 key) rather
than the
+ actual value. Default: return ``value`` unchanged.
+ """
+ return value
+
+ def deserialize_task_state_value(self, stored: str) -> str:
+ """
+ Resolve a stored task state string back to the actual value.
+
+ Called by ``TaskStateAccessor.get()`` after the stored string is
retrieved from
+ the execution API. Default: return ``stored`` unchanged.
+ """
+ return stored
+
+ def serialize_asset_state_value(self, *, value: str, key: str, asset_ref:
str) -> str:
Review Comment:
Handled in b78f33847a
##########
task-sdk/src/airflow/sdk/execution_time/context.py:
##########
@@ -565,30 +601,51 @@ def get(self, key: str) -> str | None:
if isinstance(resp, ErrorResponse) and resp.error !=
ErrorType.ASSET_STATE_NOT_FOUND:
raise AirflowRuntimeError(resp)
if isinstance(resp, AssetStateResult):
- return resp.value
+ stored = resp.value
+ # if custom backend is configured, the stored value in DB is a
reference, fetch the actual value from
+ # custom backend using the reference
+ backend = _get_worker_state_backend()
+ return backend.deserialize_asset_state_value(stored) if backend
else stored
return None
def set(self, key: str, value: str) -> None:
"""Write or overwrite the value for the given key."""
from airflow.sdk.execution_time.comms import SetAssetStateByName,
SetAssetStateByUri, ToSupervisor
from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
+ # if custom backend is configured, store the value on the custom
backend, and return the reference
+ # to the stored value to store in the DB
+ backend = _get_worker_state_backend()
+ asset_ref = self._name or self._uri or ""
+ stored = (
+ backend.serialize_asset_state_value(value=value, key=key,
asset_ref=asset_ref)
+ if backend
+ else value
+ )
+
msg: ToSupervisor
if self._name:
- msg = SetAssetStateByName(name=self._name, key=key, value=value)
+ msg = SetAssetStateByName(name=self._name, key=key, value=stored)
elif self._uri:
- msg = SetAssetStateByUri(uri=self._uri, key=key, value=value)
+ msg = SetAssetStateByUri(uri=self._uri, key=key, value=stored)
SUPERVISOR_COMMS.send(msg)
def delete(self, key: str) -> None:
"""Delete a single key. No-op if the key does not exist."""
+ from airflow.sdk._shared.state import AssetScope
from airflow.sdk.execution_time.comms import (
DeleteAssetStateByName,
DeleteAssetStateByUri,
ToSupervisor,
)
from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
+ backend = _get_worker_state_backend()
+ # session=None signals worker-side: backend cleans up external storage
only.
+ # DB reference is removed separately via comms below.
Review Comment:
Handled in b78f33847a
##########
task-sdk/src/airflow/sdk/configuration.py:
##########
@@ -210,6 +211,21 @@ def remove_all_read_configurations(self):
self.remove_section(section)
+def get_state_backend():
+ """
+ Get the state backend if configured via ``[workers] state_backend``.
+
+ Returns the instantiated backend, or ``None`` if not configured.
+ """
+ # Lazy import to trigger __getattr__ and lazy initialization
+ from airflow.sdk.configuration import conf
+
+ class_name = conf.get("workers", "state_backend", fallback="")
+ if not class_name:
+ return None
+ return import_string(class_name)()
Review Comment:
Handled in b78f33847a
##########
task-sdk/src/airflow/sdk/execution_time/context.py:
##########
@@ -440,11 +442,25 @@ def get(self, key, default: Any = NOTSET) -> Any:
raise
+@cache
Review Comment:
Handled in b78f33847a
##########
shared/state/src/airflow_shared/state/__init__.py:
##########
@@ -186,3 +193,44 @@ def cleanup(self) -> None:
retention policy. The backend is responsible for reading any relevant
config (e.g.
``[state_store] default_retention_days``) and deciding what to delete.
"""
+
+ def serialize_task_state_value(self, *, value: str, key: str, ti_id: str)
-> str:
Review Comment:
Handled in b78f33847a
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]