kaxil commented on code in PR #68133:
URL: https://github.com/apache/airflow/pull/68133#discussion_r3392246837


##########
airflow-core/src/airflow/config_templates/config.yml:
##########
@@ -3239,6 +3239,21 @@ state_store:
       type: integer
       example: "10000"
       default: "0"
+    max_value_storage_bytes:
+      description: |
+        Only applicable to MetastoreStoreBackend. Maximum size in bytes that a 
single task or asset store
+        value written via the core or execution API can have. Values that 
exceed this limit are rejected

Review Comment:
   The first paragraph says values written via the core or execution API are 
rejected at the API boundary, but the execution API models don't have a size 
check (the next paragraph says the worker write is allowed through). Worth 
rewording this to core API only. The description also doesn't mention that `0` 
disables the limit, and the last line has a typo, "use a custom state backends".



##########
airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_store.py:
##########
@@ -66,8 +65,12 @@ def value_is_json_representable(cls, v: JsonValue) -> 
JsonValue:
             serialized = json.dumps(v, allow_nan=False)
         except ValueError:
             raise ValueError("value contains non-finite numbers; NaN and Inf 
are not JSON representable")
-        if len(serialized) > _MAX_SERIALIZED_BYTES:
-            raise ValueError(f"value exceeds maximum serialized size of 
{_MAX_SERIALIZED_BYTES} bytes")
+        limit = conf.getint("state_store", "max_value_storage_bytes")
+        if limit > 0 and len(serialized) > limit:
+            raise ValueError(
+                f"value exceeds max_value_storage_bytes ({limit}); "
+                "for large payloads configure a custom [state_store] backend"

Review Comment:
   Following this advice won't get past the 422: the validator runs regardless 
of which `[state_store] backend` is configured, so a REST caller's only remedy 
is raising `max_value_storage_bytes` (or setting it to 0). That also makes the 
config description's "Only applicable to MetastoreStoreBackend" inaccurate. 
Should the check be skipped when a custom backend is configured, or should the 
message just point at the config option instead?



##########
task-sdk/src/airflow/sdk/execution_time/context.py:
##########
@@ -600,7 +601,21 @@ def set(self, key: str, value: JsonValue, *, retention: 
timedelta | None = None)
             # wrap the value with a marker to indicate that it's stored 
externally, and include the ref to the external storage
             stored = _wrap_external_ref(ref)
 
-        SUPERVISOR_COMMS.send(SetTaskStore(ti_id=self._ti_id, key=key, 
value=stored, expires_at=expires_at))
+        msg = SetTaskStore(ti_id=self._ti_id, key=key, value=stored, 
expires_at=expires_at)
+
+        limit = conf.getint("state_store", "max_value_storage_bytes")
+        if limit > 0:
+            serialized_size = len(json.dumps(stored))
+            if serialized_size > limit:
+                log.warning(
+                    "Task store value for key %r is %d bytes, which exceeds 
configured max_value_storage_bytes=%d. "
+                    "Consider using a custom [state_store] backend to offload 
large payloads.",

Review Comment:
   `[state_store] backend` is the server-side backend and doesn't help here. 
The backend that offloads large payloads from a task is `[workers] 
state_store_backend`, which is also what stops this warning firing since 
`stored` becomes a small ref. Same section name in the asset store warning 
below and in the API error messages.



##########
airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_store.py:
##########
@@ -305,6 +305,12 @@ def test_put_overwrites_expiry_on_existing_key(self, 
test_client, time_machine):
         assert resp["value"] == "v2"
         assert resp["expires_at"] is None
 
+    def test_put_value_over_limit_returns_422(self, test_client):
+        # default is 65536 bytes

Review Comment:
   Default is 65535, not 65536. Also, all the API tests cover only the default 
limit; a `conf_vars` case (e.g. `max_value_storage_bytes=0` accepting a large 
value) would actually exercise the new configurability and the `limit > 0` 
gating.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to