amoghrajesh commented on code in PR #68133:
URL: https://github.com/apache/airflow/pull/68133#discussion_r3400606504
##########
airflow-core/src/airflow/config_templates/config.yml:
##########
@@ -3239,6 +3239,21 @@ state_store:
type: integer
example: "10000"
default: "0"
+ max_value_storage_bytes:
+ description: |
+ Only applicable to MetastoreStoreBackend. Maximum size in bytes that a
single task or asset store
+ value written via the core or execution API can have. Values that
exceed this limit are rejected
+ at the API boundary.
+
+ Workers writing via the execution API log a warning to use custom
backends when this limit
+ is exceeded but the write is allowed to avoid interrupting a task mid
execution.
+
+ The default of 65535 bytes (64 KB) is appropriate for coordination
state values such as
+ job IDs, cursors, and small status maps. For larger payloads, use a
custom state backends.
+ version_added: 3.3.0
+ type: integer
+ example: "1048576"
+ default: "65535"
Review Comment:
Handled in [comments from
kaxil](https://github.com/apache/airflow/pull/68133/commits/4adbfb1701aaf1d7618e06cfdc92d6f6e4e7ec83)
##########
airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_store.py:
##########
@@ -305,6 +305,12 @@ def test_put_overwrites_expiry_on_existing_key(self,
test_client, time_machine):
assert resp["value"] == "v2"
assert resp["expires_at"] is None
+ def test_put_value_over_limit_returns_422(self, test_client):
+ # default is 65536 bytes
Review Comment:
Handled in [comments from
kaxil](https://github.com/apache/airflow/pull/68133/commits/4adbfb1701aaf1d7618e06cfdc92d6f6e4e7ec83)
##########
task-sdk/src/airflow/sdk/execution_time/context.py:
##########
@@ -600,7 +601,21 @@ def set(self, key: str, value: JsonValue, *, retention:
timedelta | None = None)
# wrap the value with a marker to indicate that it's stored
externally, and include the ref to the external storage
stored = _wrap_external_ref(ref)
- SUPERVISOR_COMMS.send(SetTaskStore(ti_id=self._ti_id, key=key,
value=stored, expires_at=expires_at))
+ msg = SetTaskStore(ti_id=self._ti_id, key=key, value=stored,
expires_at=expires_at)
+
+ limit = conf.getint("state_store", "max_value_storage_bytes")
+ if limit > 0:
+ serialized_size = len(json.dumps(stored))
+ if serialized_size > limit:
+ log.warning(
+ "Task store value for key %r is %d bytes, which exceeds
configured max_value_storage_bytes=%d. "
+ "Consider using a custom [state_store] backend to offload
large payloads.",
Review Comment:
Handled in [comments from
kaxil](https://github.com/apache/airflow/pull/68133/commits/4adbfb1701aaf1d7618e06cfdc92d6f6e4e7ec83)
##########
airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_store.py:
##########
@@ -66,8 +65,12 @@ def value_is_json_representable(cls, v: JsonValue) ->
JsonValue:
serialized = json.dumps(v, allow_nan=False)
except ValueError:
raise ValueError("value contains non-finite numbers; NaN and Inf
are not JSON representable")
- if len(serialized) > _MAX_SERIALIZED_BYTES:
- raise ValueError(f"value exceeds maximum serialized size of
{_MAX_SERIALIZED_BYTES} bytes")
+ limit = conf.getint("state_store", "max_value_storage_bytes")
+ if limit > 0 and len(serialized) > limit:
+ raise ValueError(
+ f"value exceeds max_value_storage_bytes ({limit}); "
+ "for large payloads configure a custom [state_store] backend"
Review Comment:
Handled in [comments from
kaxil](https://github.com/apache/airflow/pull/68133/commits/4adbfb1701aaf1d7618e06cfdc92d6f6e4e7ec83)
##########
airflow-core/src/airflow/config_templates/config.yml:
##########
@@ -3239,6 +3239,21 @@ state_store:
type: integer
example: "10000"
default: "0"
+ max_value_storage_bytes:
+ description: |
+ Only applicable to MetastoreStoreBackend. Maximum size in bytes that a
single task or asset store
+ value written via the core or execution API can have. Values that
exceed this limit are rejected
Review Comment:
Handled in [comments from
kaxil](https://github.com/apache/airflow/pull/68133/commits/4adbfb1701aaf1d7618e06cfdc92d6f6e4e7ec83)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]