amoghrajesh commented on code in PR #66859:
URL: https://github.com/apache/airflow/pull/66859#discussion_r3264729983
##########
task-sdk/src/airflow/sdk/execution_time/context.py:
##########
@@ -495,13 +515,26 @@ def set(self, key: str, value: str, *, retention:
timedelta | None = None) -> No
else:
days = conf.getint("state_store", "default_retention_days")
expires_at = None if days <= 0 else now + timedelta(days=days)
- SUPERVISOR_COMMS.send(SetTaskState(ti_id=self._ti_id, key=key,
value=value, expires_at=expires_at))
+
+ # if custom backend is configured, store the value on the custom
backend, and return the reference
+ # to the stored value to store in the DB
+ backend = _get_worker_state_backend()
+ stored = (
+ backend.serialize_task_state_value(value=value, key=key,
ti_id=str(self._ti_id))
+ if backend
+ else value
+ )
+
+ SUPERVISOR_COMMS.send(SetTaskState(ti_id=self._ti_id, key=key,
value=stored, expires_at=expires_at))
def delete(self, key: str) -> None:
"""Delete a single key. No-op if the key does not exist."""
from airflow.sdk.execution_time.comms import DeleteTaskState
from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
+ backend = _get_worker_state_backend()
+ if backend is not None:
Review Comment:
Since `_get_worker_state_backend()` is cached, it's already computed once
globally which is functionally the same as a property.
Keeping the current approach to avoid coupling the backend lifecycle to
individual accessor instances.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]