kaxil commented on code in PR #67547:
URL: https://github.com/apache/airflow/pull/67547#discussion_r3307553783
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/asset_state.py:
##########
@@ -85,7 +86,9 @@ def list_asset_states(
session=session,
)
rows = session.execute(paginated).all()
- entries = [AssetStateResponse(key=r.key, value=r.value,
updated_at=r.updated_at) for r in rows]
+ entries = [
+ AssetStateResponse(key=r.key, value=json.loads(r.value),
updated_at=r.updated_at) for r in rows
Review Comment:
Same `json.loads` 500 risk noted on `task_state.py:92` -- one bad row in the
asset's state table will 500 the whole list endpoint. Worth defensive decoding
here, especially since asset state is meant to be a long-lived watermark (so
legacy rows from pre-#67418 are more likely to still be around than transient
task state).
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_state.py:
##########
@@ -87,7 +88,9 @@ def list_task_states(
)
rows = session.execute(paginated).all()
entries = [
- TaskStateResponse(key=r.key, value=r.value, updated_at=r.updated_at,
expires_at=r.expires_at)
+ TaskStateResponse(
+ key=r.key, value=json.loads(r.value), updated_at=r.updated_at,
expires_at=r.expires_at
Review Comment:
`json.loads(r.value)` will raise `JSONDecodeError` on any row whose `value`
column is not valid JSON, and FastAPI surfaces that as a generic 500. Two ways
this hits prod:
1. Rows written by the execution API between #66073 (2026-05-04) and #67418
(2026-05-25) stored raw strings; #67418 switched to `json.dumps(...)` but
didn't migrate existing rows. Anyone who ran on a pre-#67418 build now poisons
reads.
2. The `BaseStateBackend` interface only requires `set(scope, key, value:
str, ...)`. A custom backend that stores its own value format (e.g.,
escaped/quoted, msgpack-decoded-to-str, etc.) would have worked under the old
`value: str` contract and now breaks.
In a list endpoint this is worse than in `get`: one bad row poisons the
whole page, so users can't even paginate past it.
Suggest wrapping the decode in a try/except (skip+log, or return a sentinel)
so a single legacy/odd row doesn't 500 the whole listing. Same pattern needed
at `get_task_state` (line 133), and in `asset_state.py` at lines 90 and 121.
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/asset_state.py:
##########
@@ -115,7 +118,7 @@ def get_asset_state(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Asset state key {key!r} not found",
)
- return AssetStateResponse(key=row.key, value=row.value,
updated_at=row.updated_at)
+ return AssetStateResponse(key=row.key, value=json.loads(row.value),
updated_at=row.updated_at)
Review Comment:
Same `json.loads` 500 risk as the list endpoint (line 90). A legacy or
custom-backend row turns `GET` into a 500.
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_state.py:
##########
@@ -127,7 +130,7 @@ def get_task_state(
detail=f"Task state key {key!r} not found",
)
return TaskStateResponse(
- key=row.key, value=row.value, updated_at=row.updated_at,
expires_at=row.expires_at
+ key=row.key, value=json.loads(row.value), updated_at=row.updated_at,
expires_at=row.expires_at
Review Comment:
Same `json.loads` 500 risk as the list endpoint above (see comment on line
92). A row written by an execution-API build between #66073 and #67418, or by a
custom `BaseStateBackend`, will turn a `GET` into a 500 instead of returning
the value. Worth catching `JSONDecodeError` explicitly here too.
##########
airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_state.py:
##########
@@ -42,4 +46,15 @@ class TaskStateCollectionResponse(BaseModel):
class TaskStateBody(StrictBaseModel):
"""Request body for setting a task state value."""
- value: str = Field(max_length=65535)
+ value: JsonValue
+
+ @field_validator("value")
+ @classmethod
+ def value_is_json_representable(cls, v: JsonValue) -> JsonValue:
+ if v is None:
+ raise ValueError("value cannot be null")
+ if isinstance(v, float) and not math.isfinite(v):
+ raise ValueError("value must be a finite number; NaN and Inf are
not JSON representable")
+ if len(json.dumps(v)) > _MAX_SERIALIZED_BYTES:
Review Comment:
The non-finite check on line 56 only catches top-level floats. Nested
non-finite values pass through:
```python
TaskStateBody.model_validate({"value": {"a": float("nan")}}) # passes
# then json.dumps({"a": float("nan")}) -> '{"a": NaN}' -- not valid JSON
```
Simpler + complete fix: drop the `isinstance(v, float) and not
math.isfinite(v)` branch and pass `allow_nan=False` to the size check:
```python
try:
serialized = json.dumps(v, allow_nan=False)
except ValueError as e:
raise ValueError("value contains non-finite numbers; NaN and Inf are not
JSON representable") from e
if len(serialized) > _MAX_SERIALIZED_BYTES:
raise ValueError(...)
```
That catches NaN/Inf anywhere in the structure with a single pass and the
existing top-level guard becomes redundant. Same fix needed in
`asset_state.py:57` -- the two validators are line-for-line identical.
##########
airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_state.py:
##########
@@ -191,6 +193,18 @@ def test_overwrites_existing_key(self, test_client):
def test_empty_body_returns_422(self, test_client):
assert test_client.put(f"{BASE_URL}/job_id", json={}).status_code ==
422
+ def test_null_value_returns_422(self, test_client):
+ assert test_client.put(f"{BASE_URL}/job_id", json={"value":
None}).status_code == 422
+
+ @pytest.mark.parametrize("bad_float", [float("nan"), float("inf"),
float("-inf")])
+ def test_non_finite_float_returns_422(self, test_client, bad_float):
+ with pytest.raises(ValueError, match="Out of range float values are
not JSON compliant"):
Review Comment:
This test doesn't actually verify the validator behavior. The matched
message `"Out of range float values are not JSON compliant"` is CPython's
stdlib `json` error message (raised by FastAPI's request-body serializer with
`allow_nan=False` before the request ever reaches the route), not the
validator's `"value must be a finite number; NaN and Inf are not JSON
representable"`.
Consequence: the test passes even if the validator is removed entirely, so
it gives false confidence in the new check. Two ways to fix:
1. To actually exercise the validator, you have to bypass FastAPI's JSON
parsing -- e.g. directly construct `TaskStateBody(value=float("nan"))` and
`pytest.raises(ValidationError, match="value must be a finite number")`.
2. Or, if the intent is to confirm the endpoint rejects non-finite floats
(regardless of layer), assert `response.status_code == 422` instead of
expecting `ValueError` to propagate.
Same issue in `test_asset_state.py:185`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]