amoghrajesh commented on code in PR #66859:
URL: https://github.com/apache/airflow/pull/66859#discussion_r3271695153


##########
task-sdk/src/airflow/sdk/execution_time/context.py:
##########
@@ -495,13 +519,26 @@ def set(self, key: str, value: str, *, retention: 
timedelta | None = None) -> No
         else:
             days = conf.getint("state_store", "default_retention_days")
             expires_at = None if days <= 0 else now + timedelta(days=days)
-        SUPERVISOR_COMMS.send(SetTaskState(ti_id=self._ti_id, key=key, 
value=value, expires_at=expires_at))
+
+        # if custom backend is configured, store the value on the custom 
backend, and return the reference
+        # to the stored value to store in the DB
+        backend = _get_worker_state_backend()
+        stored = (
+            backend.serialize_task_state_to_ref(value=value, key=key, 
ti_id=str(self._ti_id))
+            if backend
+            else value
+        )
+
+        SUPERVISOR_COMMS.send(SetTaskState(ti_id=self._ti_id, key=key, 
value=stored, expires_at=expires_at))
 
     def delete(self, key: str) -> None:
         """Delete a single key. No-op if the key does not exist."""
         from airflow.sdk.execution_time.comms import DeleteTaskState
         from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
 
+        backend = _get_worker_state_backend()
+        if backend is not None:

Review Comment:
   Fixed, swapped orderso ensure that comms (DB ref removal) now happens first, 
backend cleanup second.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to