amoghrajesh commented on code in PR #66859:
URL: https://github.com/apache/airflow/pull/66859#discussion_r3265069766


##########
task-sdk/src/airflow/sdk/execution_time/context.py:
##########
@@ -565,30 +601,51 @@ def get(self, key: str) -> str | None:
         if isinstance(resp, ErrorResponse) and resp.error != 
ErrorType.ASSET_STATE_NOT_FOUND:
             raise AirflowRuntimeError(resp)
         if isinstance(resp, AssetStateResult):
-            return resp.value
+            stored = resp.value
+            # if custom backend is configured, the stored value in DB is a 
reference, fetch the actual value from
+            # custom backend using the reference
+            backend = _get_worker_state_backend()
+            return backend.deserialize_asset_state_value(stored) if backend 
else stored
         return None
 
     def set(self, key: str, value: str) -> None:
         """Write or overwrite the value for the given key."""
         from airflow.sdk.execution_time.comms import SetAssetStateByName, 
SetAssetStateByUri, ToSupervisor
         from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
 
+        # if custom backend is configured, store the value on the custom 
backend, and return the reference
+        # to the stored value to store in the DB
+        backend = _get_worker_state_backend()
+        asset_ref = self._name or self._uri or ""
+        stored = (
+            backend.serialize_asset_state_value(value=value, key=key, 
asset_ref=asset_ref)
+            if backend
+            else value
+        )
+
         msg: ToSupervisor
         if self._name:
-            msg = SetAssetStateByName(name=self._name, key=key, value=value)
+            msg = SetAssetStateByName(name=self._name, key=key, value=stored)
         elif self._uri:
-            msg = SetAssetStateByUri(uri=self._uri, key=key, value=value)
+            msg = SetAssetStateByUri(uri=self._uri, key=key, value=stored)
         SUPERVISOR_COMMS.send(msg)
 
     def delete(self, key: str) -> None:
         """Delete a single key. No-op if the key does not exist."""
+        from airflow.sdk._shared.state import AssetScope
         from airflow.sdk.execution_time.comms import (
             DeleteAssetStateByName,
             DeleteAssetStateByUri,
             ToSupervisor,
         )
         from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
 
+        backend = _get_worker_state_backend()
+        # session=None signals worker-side: backend cleans up external storage 
only.
+        # DB reference is removed separately via comms below.

Review Comment:
   Ah old artefact. The comment was misleading. The DB is not skipped because 
of `session=None`, it's skipped because custom worker backends are designed to 
handle external storage only and don't interact with DB at all. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to