Ivan Torres created FLINK-39000:
-----------------------------------

             Summary: Avoid redundant seeks during operator list state restore
                 Key: FLINK-39000
                 URL: https://issues.apache.org/jira/browse/FLINK-39000
             Project: Flink
          Issue Type: Improvement
          Components: Runtime / Checkpointing
    Affects Versions: 2.1.1, 1.20.3
            Reporter: Ivan Torres


*Problem:* Operator state restore performs FSDataInputStream.seek(offset) for 
every partition element in 
OperatorStateRestoreOperation.deserializeOperatorStateValues, even when offsets 
are sequential.

For checkpoints stored on object stores (e.g., S3 / similar), repeated seeks 
can be very expensive (often triggering additional range reads / stream 
resets). This can dominate restore time for large operator list state (e.g., 
tens/hundreds of thousands of partitions).

*Root cause:* Offsets for PartitionableListState are written sequentially 
(captured via out.getPos() before each element is serialized), so after 
deserializing one element the stream is typically already positioned at the 
next offset; seeking again is redundant.

*Proposed change:* Track current stream position during restore and only call 
seek when needed:
 * Pseudocode: if (currentPos != offset) in.seek(offset); ... currentPos = 
in.getPos();
 * Scope: flink-runtime only; no state format or semantics changes.

*Expected result:* Restoring operator list state with sequential offsets avoids 
redundant seeks (seek count drops from O(numPartitions) to ~1 per state/handle 
in the common case). No change in restored state contents/order vs. current 
behavior. Lastly, add a unit test in flink-runtime using an FSDataInputStream 
wrapper that counts seek() calls to prevent regression (sequential offsets → 
minimal seeks; non-sequential offsets still seek as needed).

*References:*
 * flink/runtime/state/OperatorStateRestoreOperation.java
 ** Restore path does per-element in.seek(offset) unconditionally
 * flink/runtime/state/PartitionableListState.java
 ** Offsets are captured via out.getPos() before serializing each element → 
sequential offsets
 * flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java
 ** Snapshot writes operator list state via PartitionableListState.write(...), 
typically through CompressibleFSDataOutputStream
 * flink/runtime/state/CompressibleFSDataInputStream.java
 ** Compressed seek has extra work, making redundant seeks more expensive

--

Happy to contribute on this one :)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to