kaxil commented on code in PR #68133:
URL: https://github.com/apache/airflow/pull/68133#discussion_r3392246837
##########
airflow-core/src/airflow/config_templates/config.yml:
##########
@@ -3239,6 +3239,21 @@ state_store:
type: integer
example: "10000"
default: "0"
+ max_value_storage_bytes:
+ description: |
+ Only applicable to MetastoreStoreBackend. Maximum size in bytes that a
single task or asset store
+ value written via the core or execution API can have. Values that
exceed this limit are rejected
Review Comment:
The first paragraph says values written via the core or execution API are
rejected at the API boundary, but the execution API models don't have a size
check (the next paragraph says the worker write is allowed through). Worth
rewording this to core API only. The description also doesn't mention that `0`
disables the limit, and the last line has a typo, "use a custom state backends".
##########
airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_store.py:
##########
@@ -66,8 +65,12 @@ def value_is_json_representable(cls, v: JsonValue) ->
JsonValue:
serialized = json.dumps(v, allow_nan=False)
except ValueError:
raise ValueError("value contains non-finite numbers; NaN and Inf
are not JSON representable")
- if len(serialized) > _MAX_SERIALIZED_BYTES:
- raise ValueError(f"value exceeds maximum serialized size of
{_MAX_SERIALIZED_BYTES} bytes")
+ limit = conf.getint("state_store", "max_value_storage_bytes")
+ if limit > 0 and len(serialized) > limit:
+ raise ValueError(
+ f"value exceeds max_value_storage_bytes ({limit}); "
+ "for large payloads configure a custom [state_store] backend"
Review Comment:
Following this advice won't get past the 422: the validator runs regardless
of which `[state_store] backend` is configured, so a REST caller's only remedy
is raising `max_value_storage_bytes` (or setting it to 0). That also makes the
config description's "Only applicable to MetastoreStoreBackend" inaccurate.
Should the check be skipped when a custom backend is configured, or should the
message just point at the config option instead?
##########
task-sdk/src/airflow/sdk/execution_time/context.py:
##########
@@ -600,7 +601,21 @@ def set(self, key: str, value: JsonValue, *, retention:
timedelta | None = None)
# wrap the value with a marker to indicate that it's stored
externally, and include the ref to the external storage
stored = _wrap_external_ref(ref)
- SUPERVISOR_COMMS.send(SetTaskStore(ti_id=self._ti_id, key=key,
value=stored, expires_at=expires_at))
+ msg = SetTaskStore(ti_id=self._ti_id, key=key, value=stored,
expires_at=expires_at)
+
+ limit = conf.getint("state_store", "max_value_storage_bytes")
+ if limit > 0:
+ serialized_size = len(json.dumps(stored))
+ if serialized_size > limit:
+ log.warning(
+ "Task store value for key %r is %d bytes, which exceeds
configured max_value_storage_bytes=%d. "
+ "Consider using a custom [state_store] backend to offload
large payloads.",
Review Comment:
`[state_store] backend` is the server-side backend and doesn't help here.
The backend that offloads large payloads from a task is `[workers]
state_store_backend`, which is also what stops this warning firing since
`stored` becomes a small ref. Same section name in the asset store warning
below and in the API error messages.
##########
airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_store.py:
##########
@@ -305,6 +305,12 @@ def test_put_overwrites_expiry_on_existing_key(self,
test_client, time_machine):
assert resp["value"] == "v2"
assert resp["expires_at"] is None
+ def test_put_value_over_limit_returns_422(self, test_client):
+ # default is 65536 bytes
Review Comment:
Default is 65535, not 65536. Also, all the API tests cover only the default
limit; a `conf_vars` case (e.g. `max_value_storage_bytes=0` accepting a large
value) would actually exercise the new configurability and the `limit > 0`
gating.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]