amoghrajesh commented on code in PR #67530:
URL: https://github.com/apache/airflow/pull/67530#discussion_r3308852711


##########
task-sdk/src/airflow/sdk/execution_time/context.py:
##########
@@ -648,11 +647,12 @@ def get(self, key: str) -> JsonValue:
         if isinstance(resp, AssetStateResult):
             stored = resp.value
             backend = _get_worker_state_backend()
-            if backend is not None:
-                # serialize_asset_state_to_ref always returns str by contract; 
stored contains the ref.
+            if backend is not None and isinstance(stored, dict) and 
stored.get("__type") == "ExternalState":
+                # unwrap the marker to get the ref, and retrieve the actual 
value from the backend using the ref
+                ref = stored["__var"]
                 if TYPE_CHECKING:
-                    assert isinstance(stored, str)
-                return backend.deserialize_asset_state_from_ref(stored)
+                    assert isinstance(ref, str)
+                return backend.deserialize_asset_state_from_ref(ref)
             return stored

Review Comment:
   On the rolling upgrade question: pre-envelope rows cannot exist. The custom 
state backend feature is brand new in 3.3 and has never shipped, so there are 
no deployed clusters with rows written in the old format. No backwards-compat 
branch needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to