jason810496 commented on code in PR #66859:
URL: https://github.com/apache/airflow/pull/66859#discussion_r3264258642


##########
task-sdk/src/airflow/sdk/execution_time/context.py:
##########
@@ -495,13 +515,26 @@ def set(self, key: str, value: str, *, retention: 
timedelta | None = None) -> No
         else:
             days = conf.getint("state_store", "default_retention_days")
             expires_at = None if days <= 0 else now + timedelta(days=days)
-        SUPERVISOR_COMMS.send(SetTaskState(ti_id=self._ti_id, key=key, 
value=value, expires_at=expires_at))
+
+        # if custom backend is configured, store the value on the custom 
backend, and return the reference
+        # to the stored value to store in the DB
+        backend = _get_worker_state_backend()
+        stored = (
+            backend.serialize_task_state_value(value=value, key=key, 
ti_id=str(self._ti_id))
+            if backend
+            else value
+        )
+
+        SUPERVISOR_COMMS.send(SetTaskState(ti_id=self._ti_id, key=key, 
value=stored, expires_at=expires_at))
 
     def delete(self, key: str) -> None:
         """Delete a single key. No-op if the key does not exist."""
         from airflow.sdk.execution_time.comms import DeleteTaskState
         from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
 
+        backend = _get_worker_state_backend()
+        if backend is not None:

Review Comment:
   How about having a property for the worker backend?
   
   
   ```suggestion
           if self.worker_backend:
               self.worker_backend.delete(self._scope, key)
   ```



##########
task-sdk/src/airflow/sdk/execution_time/context.py:
##########
@@ -440,11 +442,25 @@ def get(self, key, default: Any = NOTSET) -> Any:
             raise
 
 
+@cache

Review Comment:
   Would it be better to not overwhelm the `sdk.configuration`?
   Since the `get_state_backend` is only used here, so it seems we could 
consolidate the state backend initialization here with cache. That the 
lifecycle of the state backend import will be more clear.



##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -1429,6 +1438,14 @@ def _handle_current_task_success(
     stats.incr("operator_successes", tags={**stats_tags, "operator_name": 
operator})
     stats.incr("ti_successes", tags=stats_tags)
 
+    # TODO: uncomment below once https://github.com/apache/airflow/pull/66699 
is merged
+    # if conf.getboolean("state_store", "clear_on_success"):
+    #     log.info("Task state will be cleared by the server because 
clear_on_success is enabled.")
+    #
+    #     if _get_worker_state_backend() is not None:
+    #         # clear the task state keys for custom state backends configured 
on worker side
+    #         context["task_state"].clear()

Review Comment:
   May I double check that the TODO comment here will only clear the worker 
backend (custom backend) and the  following `ti_update_state` Execution API 
will only clear the DB reference?
   
   Not sure if I understand it correctly?
   
   
https://github.com/apache/airflow/blob/41a6436a1dd6ac82a1504db145e2958fd426a1fd/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py#L468-L491
    



##########
task-sdk/src/airflow/sdk/configuration.py:
##########
@@ -210,6 +211,21 @@ def remove_all_read_configurations(self):
             self.remove_section(section)
 
 
+def get_state_backend():
+    """
+    Get the state backend if configured via ``[workers] state_backend``.
+
+    Returns the instantiated backend, or ``None`` if not configured.
+    """
+    # Lazy import to trigger __getattr__ and lazy initialization
+    from airflow.sdk.configuration import conf
+
+    class_name = conf.get("workers", "state_backend", fallback="")
+    if not class_name:
+        return None
+    return import_string(class_name)()

Review Comment:
   Would it be better to add some validation or try catch for the exception?



##########
task-sdk/src/airflow/sdk/execution_time/context.py:
##########
@@ -565,30 +601,51 @@ def get(self, key: str) -> str | None:
         if isinstance(resp, ErrorResponse) and resp.error != 
ErrorType.ASSET_STATE_NOT_FOUND:
             raise AirflowRuntimeError(resp)
         if isinstance(resp, AssetStateResult):
-            return resp.value
+            stored = resp.value
+            # if custom backend is configured, the stored value in DB is a 
reference, fetch the actual value from
+            # custom backend using the reference
+            backend = _get_worker_state_backend()
+            return backend.deserialize_asset_state_value(stored) if backend 
else stored
         return None
 
     def set(self, key: str, value: str) -> None:
         """Write or overwrite the value for the given key."""
         from airflow.sdk.execution_time.comms import SetAssetStateByName, 
SetAssetStateByUri, ToSupervisor
         from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
 
+        # if custom backend is configured, store the value on the custom 
backend, and return the reference
+        # to the stored value to store in the DB
+        backend = _get_worker_state_backend()
+        asset_ref = self._name or self._uri or ""
+        stored = (
+            backend.serialize_asset_state_value(value=value, key=key, 
asset_ref=asset_ref)
+            if backend
+            else value
+        )
+
         msg: ToSupervisor
         if self._name:
-            msg = SetAssetStateByName(name=self._name, key=key, value=value)
+            msg = SetAssetStateByName(name=self._name, key=key, value=stored)
         elif self._uri:
-            msg = SetAssetStateByUri(uri=self._uri, key=key, value=value)
+            msg = SetAssetStateByUri(uri=self._uri, key=key, value=stored)
         SUPERVISOR_COMMS.send(msg)
 
     def delete(self, key: str) -> None:
         """Delete a single key. No-op if the key does not exist."""
+        from airflow.sdk._shared.state import AssetScope
         from airflow.sdk.execution_time.comms import (
             DeleteAssetStateByName,
             DeleteAssetStateByUri,
             ToSupervisor,
         )
         from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
 
+        backend = _get_worker_state_backend()
+        # session=None signals worker-side: backend cleans up external storage 
only.
+        # DB reference is removed separately via comms below.

Review Comment:
   Would it be better to introduce a dedicated parameter to indicate whether 
it's worker slide backend instead of relying on `session=None`?



##########
shared/state/src/airflow_shared/state/__init__.py:
##########
@@ -186,3 +193,44 @@ def cleanup(self) -> None:
         retention policy. The backend is responsible for reading any relevant 
config (e.g.
         ``[state_store] default_retention_days``) and deciding what to delete.
         """
+
+    def serialize_task_state_value(self, *, value: str, key: str, ti_id: str) 
-> str:
+        """
+        Serialize a task state value before it is sent to the execution API 
for db persistence.
+
+        Called by ``TaskStateAccessor.set()`` on the worker. The return value 
is what gets
+        stored in the DB — typically a reference path (e.g. an S3 key) rather 
than the
+        actual value. Default: return ``value`` unchanged.
+        """
+        return value
+
+    def deserialize_task_state_value(self, stored: str) -> str:
+        """
+        Resolve a stored task state string back to the actual value.
+
+        Called by ``TaskStateAccessor.get()`` after the stored string is 
retrieved from
+        the execution API. Default: return ``stored`` unchanged.
+        """
+        return stored
+
+    def serialize_asset_state_value(self, *, value: str, key: str, asset_ref: 
str) -> str:

Review Comment:
   Additionally, we might need to explicitly mention that this function can 
only return deterministic key and should not rely on timestamp or runtime 
generated uuid. Or we might loose the correct reference.



##########
shared/state/src/airflow_shared/state/__init__.py:
##########
@@ -186,3 +193,44 @@ def cleanup(self) -> None:
         retention policy. The backend is responsible for reading any relevant 
config (e.g.
         ``[state_store] default_retention_days``) and deciding what to delete.
         """
+
+    def serialize_task_state_value(self, *, value: str, key: str, ti_id: str) 
-> str:

Review Comment:
   How about renaming `serialize_task_state_value` and 
`deserialize_task_state_value` methods with something with ref or reference to 
avoid the furhter ambigious? 
   
   Since it's actually returning a `reference` in DB instead of the actual 
value.
   For example: `serialize_task_state_to_ref` / 
`deserialize_task_state_from_ref` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to