Ivan Torres created FLINK-39000:
-----------------------------------
Summary: Avoid redundant seeks during operator list state restore
Key: FLINK-39000
URL: https://issues.apache.org/jira/browse/FLINK-39000
Project: Flink
Issue Type: Improvement
Components: Runtime / Checkpointing
Affects Versions: 2.1.1, 1.20.3
Reporter: Ivan Torres
*Problem:* Operator state restore performs FSDataInputStream.seek(offset) for
every partition element in
OperatorStateRestoreOperation.deserializeOperatorStateValues, even when offsets
are sequential.
For checkpoints stored on object stores (e.g., S3 / similar), repeated seeks
can be very expensive (often triggering additional range reads / stream
resets). This can dominate restore time for large operator list state (e.g.,
tens/hundreds of thousands of partitions).
*Root cause:* Offsets for PartitionableListState are written sequentially
(captured via out.getPos() before each element is serialized), so after
deserializing one element the stream is typically already positioned at the
next offset; seeking again is redundant.
*Proposed change:* Track current stream position during restore and only call
seek when needed:
* Pseudocode: if (currentPos != offset) in.seek(offset); ... currentPos =
in.getPos();
* Scope: flink-runtime only; no state format or semantics changes.
*Expected result:* Restoring operator list state with sequential offsets avoids
redundant seeks (seek count drops from O(numPartitions) to ~1 per state/handle
in the common case). No change in restored state contents/order vs. current
behavior. Lastly, add a unit test in flink-runtime using an FSDataInputStream
wrapper that counts seek() calls to prevent regression (sequential offsets →
minimal seeks; non-sequential offsets still seek as needed).
*References:*
* flink/runtime/state/OperatorStateRestoreOperation.java
** Restore path does per-element in.seek(offset) unconditionally
* flink/runtime/state/PartitionableListState.java
** Offsets are captured via out.getPos() before serializing each element →
sequential offsets
* flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java
** Snapshot writes operator list state via PartitionableListState.write(...),
typically through CompressibleFSDataOutputStream
* flink/runtime/state/CompressibleFSDataInputStream.java
** Compressed seek has extra work, making redundant seeks more expensive
--
Happy to contribute on this one :)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)