kaxil commented on code in PR #67530:
URL: https://github.com/apache/airflow/pull/67530#discussion_r3307394111
##########
task-sdk/src/airflow/sdk/execution_time/context.py:
##########
@@ -512,14 +512,13 @@ def get(self, key: str) -> JsonValue:
raise AirflowRuntimeError(resp)
if isinstance(resp, TaskStateResult):
stored = resp.value
- # if custom backend is configured, the stored value in DB is a
reference, fetch the actual value from
- # custom backend using the reference
backend = _get_worker_state_backend()
- if backend is not None:
- # serialize_task_state_to_ref always returns str by contract;
stored contains the ref.
+ if backend is not None and isinstance(stored, dict) and
stored.get("__type") == "ExternalState":
+ # unwrap the marker to get the ref, and retrieve the actual
value from the backend using the ref
+ ref = stored["__var"]
if TYPE_CHECKING:
- assert isinstance(stored, str)
- return backend.deserialize_task_state_from_ref(stored)
+ assert isinstance(ref, str)
+ return backend.deserialize_task_state_from_ref(ref)
return stored
Review Comment:
When `_get_worker_state_backend()` returns a backend but `stored` is not an
`ExternalState` envelope (`not isinstance(stored, dict)`, missing `__type`, or
`__type != "ExternalState"`), this falls through to `return stored` on line 522
-- the raw value is returned to the user as if no backend were configured. This
matters during rolling upgrades / mixed-version workers: a row written by an
older worker (pre-#67530) stored the raw ref string directly (no envelope).
After upgrade, the new worker reads it back, fails the `isinstance(stored,
dict)` check, and returns the *ref string itself* to user code rather than
dereferencing it through the backend. The pre-#67530 code path explicitly
called `backend.deserialize_task_state_from_ref(stored)` in that case.
Can you confirm the intended behavior here? If pre-envelope rows aren't
expected to exist (fresh schema with no migration), worth a comment or a log
warning so the silent fallthrough doesn't mask a future bug. If pre-envelope
rows *can* exist (rolling deploy), this needs a fallback to the old
`deserialize_task_state_from_ref(stored)` path for `isinstance(stored, str)`.
Same issue at line 656 for asset state.
##########
task-sdk/src/airflow/sdk/execution_time/context.py:
##########
@@ -512,14 +512,13 @@ def get(self, key: str) -> JsonValue:
raise AirflowRuntimeError(resp)
if isinstance(resp, TaskStateResult):
stored = resp.value
- # if custom backend is configured, the stored value in DB is a
reference, fetch the actual value from
- # custom backend using the reference
backend = _get_worker_state_backend()
- if backend is not None:
- # serialize_task_state_to_ref always returns str by contract;
stored contains the ref.
+ if backend is not None and isinstance(stored, dict) and
stored.get("__type") == "ExternalState":
Review Comment:
The envelope shape `{"__type", "__var"}` aliases the wire format that
`airflow-core/src/airflow/serialization/serialized_objects.py` uses for
serializing complex objects (`OLD_TYPE = "__type"`, `OLD_DATA = "__var"`).
Reusing those keys here creates two classes of collisions worth thinking
through.
First, user-supplied dicts with the same shape: a user who does
`task_state.set("k", {"__type": "ExternalState", "__var": "some-string"})`
will, on read, have their dict misinterpreted as a backend ref and passed to
`backend.deserialize_task_state_from_ref("some-string")`. There's no
`FORBIDDEN_XCOM_KEYS`-equivalent guard on `task_state.set()` to block this.
Even without a backend the value still appears legitimate, so the trap is
silent until the deployment turns a backend on.
Second, cross-mixing with `BaseSerialization`: if anything downstream (UI
renderer, audit log, copying state to XCom) ever calls
`BaseSerialization.deserialize` on this dict, it will try to resolve
`"ExternalState"` as a registered class.
The PR title mentions "for UI clarity" -- and the UI is the *consumer* of
this envelope. So the question is: what's the rationale for adopting
`__type`/`__var` (Airflow's internal serialization envelope) here, vs. a
distinct namespace like `{"__airflow_state_ref__": "..."}` that can't collide
with either user values or `BaseSerialization`? If it's intentional alignment
with the UI's existing renderer, worth a comment pointing to that. If it's
incidental, the namespace collision is worth resolving now while there's no
on-disk data.
##########
task-sdk/src/airflow/sdk/execution_time/context.py:
##########
@@ -512,14 +512,13 @@ def get(self, key: str) -> JsonValue:
raise AirflowRuntimeError(resp)
if isinstance(resp, TaskStateResult):
stored = resp.value
- # if custom backend is configured, the stored value in DB is a
reference, fetch the actual value from
- # custom backend using the reference
backend = _get_worker_state_backend()
- if backend is not None:
- # serialize_task_state_to_ref always returns str by contract;
stored contains the ref.
+ if backend is not None and isinstance(stored, dict) and
stored.get("__type") == "ExternalState":
+ # unwrap the marker to get the ref, and retrieve the actual
value from the backend using the ref
+ ref = stored["__var"]
Review Comment:
`stored["__var"]` raises a bare `KeyError` if a malformed envelope (e.g.
`{"__type": "ExternalState"}` with no `__var`) ever lands in the DB -- corrupt
row, partial write, schema drift on a future backend. Bare `KeyError`
propagating out of `TaskStateAccessor.get()` will be hard to diagnose from the
user's traceback.
Minor: `ref = stored.get("__var")` plus an explicit `if not isinstance(ref,
str): raise AirflowRuntimeError(...)` (or similar) gives a typed, identifiable
error. Same on line 652 for the asset-state path.
##########
task-sdk/src/airflow/sdk/execution_time/context.py:
##########
@@ -548,12 +547,12 @@ def set(self, key: str, value: JsonValue, *, retention:
timedelta | None = None)
# if custom backend is configured, store the value on the custom
backend, and return the reference
# to the stored value to store in the DB
+ stored: JsonValue = value
backend = _get_worker_state_backend()
- stored = (
- backend.serialize_task_state_to_ref(value=value, key=key,
ti_id=str(self._ti_id))
- if backend
- else value
- )
+ if backend is not None:
+ # decorate the value with a marker to indicate that it's stored
externally, and include the ref to the external storage
+ ref = backend.serialize_task_state_to_ref(value=value, key=key,
ti_id=str(self._ti_id))
+ stored = {"__type": "ExternalState", "__var": ref}
Review Comment:
The string literals `"__type"`, `"__var"`, and `"ExternalState"` appear four
times across this file (516/518, 555, 650/652, 670) and form the wire contract
between the worker SDK and any downstream consumer (UI, future server-side
reader). Hoisting them to module-level constants -- e.g. `_EXTERNAL_STATE_TYPE
= "ExternalState"`, `_EXTERNAL_STATE_TYPE_KEY = "__type"`,
`_EXTERNAL_STATE_REF_KEY = "__var"` -- and writing a small helper like
`_wrap_external_ref(ref) -> dict` / `_unwrap_external_ref(stored) -> str |
None` will prevent a single typo (`"__var"` -> `"_var"`, `"ExternalState"` ->
`"ExternalSate"`) from silently breaking the round-trip in one direction, make
the contract visible to the UI / server-side renderer that consumes this
format, and be the natural place to add the malformed-envelope guard from the
comment on line 518.
##########
task-sdk/src/airflow/sdk/execution_time/context.py:
##########
@@ -661,15 +661,13 @@ def set(self, key: str, value: JsonValue) -> None:
from airflow.sdk.execution_time.comms import SetAssetStateByName,
SetAssetStateByUri, ToSupervisor
from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
- # if custom backend is configured, store the value on the custom
backend, and return the reference
- # to the stored value to store in the DB
backend = _get_worker_state_backend()
asset_ref = self._name or self._uri or ""
- stored = (
- backend.serialize_asset_state_to_ref(value=value, key=key,
asset_ref=asset_ref)
- if backend
- else value
- )
+ stored: JsonValue = value
+ if backend is not None:
+ # decorate the value with a marker to indicate that it's stored
externally, and include the ref to the external storage
+ ref = backend.serialize_asset_state_to_ref(value=value, key=key,
asset_ref=asset_ref)
+ stored = {"__type": "ExternalState", "__var": ref}
Review Comment:
The envelope is now a wire-protocol contract between the SDK and any
consumer of `task_state`/`asset_state` values (the UI in this PR, but also any
future server-side reader or third-party state backend implementer). Right now
that contract is encoded only as four duplicated dict literals across this one
file.
Worth documenting in `BaseStateBackend`
(shared/state/src/airflow_shared/state/__init__.py) -- a brief note in the
class docstring or on `serialize_task_state_to_ref` /
`serialize_asset_state_to_ref` -- that the returned ref string is *wrapped by
the worker* into `{"__type": "ExternalState", "__var": <ref>}` before
persistence, so implementers don't try to wrap it themselves. And in the UI
consumer / renderer: a comment pointing back to this envelope shape so renderer
changes here don't break the UI silently.
(The PR description says "for UI clarity" but doesn't link the UI consumer.
Could you add a link to the UI PR/issue in the description so the rollout
sequence is reviewable? Otherwise it's hard to verify the envelope shape
actually matches what the renderer expects.)
##########
task-sdk/src/airflow/sdk/execution_time/context.py:
##########
@@ -648,11 +647,12 @@ def get(self, key: str) -> JsonValue:
if isinstance(resp, AssetStateResult):
stored = resp.value
backend = _get_worker_state_backend()
- if backend is not None:
- # serialize_asset_state_to_ref always returns str by contract;
stored contains the ref.
+ if backend is not None and isinstance(stored, dict) and
stored.get("__type") == "ExternalState":
+ # unwrap the marker to get the ref, and retrieve the actual
value from the backend using the ref
+ ref = stored["__var"]
if TYPE_CHECKING:
- assert isinstance(stored, str)
- return backend.deserialize_asset_state_from_ref(stored)
+ assert isinstance(ref, str)
+ return backend.deserialize_asset_state_from_ref(ref)
return stored
Review Comment:
Same silent fallthrough as the task-state read at line 522 -- if a backend
is configured but `stored` isn't an `ExternalState` envelope, the raw stored
value is returned to user code without going through
`backend.deserialize_asset_state_from_ref()`.
Same question applies: are pre-envelope rows expected to exist (rolling
upgrade), or is this a fresh schema? Either a backwards-compat branch for
`isinstance(stored, str)` or a log/warning that this code path was hit is worth
adding.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]