This is an automated email from the ASF dual-hosted git repository.
gaborgsomogyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b1f29f682b4 [FLINK-39000] Avoid redundant seeks during operator list
state restore
b1f29f682b4 is described below
commit b1f29f682b4a40d6163def2b9a5edebae6dbc683
Author: ivan.torres <[email protected]>
AuthorDate: Mon Apr 13 04:42:48 2026 -0400
[FLINK-39000] Avoid redundant seeks during operator list state restore
Co-authored-by: ivan.torres <[email protected]>
---
.../state/OperatorStateRestoreOperation.java | 10 +-
.../state/OperatorStateRestoreOperationTest.java | 140 +++++++++++++++++++++
2 files changed, 147 insertions(+), 3 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java
index e7ba144d5b7..602baec35bc 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java
@@ -225,15 +225,19 @@ public class OperatorStateRestoreOperation implements
RestoreOperation<Void> {
OperatorStateHandle.StateMetaInfo metaInfo)
throws IOException {
- if (null != metaInfo) {
+ if (metaInfo != null) {
long[] offsets = metaInfo.getOffsets();
- if (null != offsets) {
+ if (offsets != null) {
DataInputView div = new DataInputViewStreamWrapper(in);
TypeSerializer<S> serializer =
stateListForName.getStateMetaInfo().getPartitionStateSerializer();
+ long currentPos = in.getPos();
for (long offset : offsets) {
- in.seek(offset);
+ if (currentPos != offset) {
+ in.seek(offset);
+ }
stateListForName.add(serializer.deserialize(div));
+ currentPos = in.getPos();
}
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateRestoreOperationTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateRestoreOperationTest.java
index a22d7e30c39..bfc52133e81 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateRestoreOperationTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateRestoreOperationTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import
org.apache.flink.runtime.checkpoint.RoundRobinOperatorStateRepartitioner;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
@@ -31,6 +32,7 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.testcontainers.utility.ThrowingFunction;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -39,6 +41,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -268,6 +272,39 @@ public class OperatorStateRestoreOperationTest {
}
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testRestoreAvoidsRedundantSeeksForSequentialOffsets(boolean
snapshotCompressionEnabled)
+ throws Exception {
+ final ExecutionConfig cfg = new ExecutionConfig();
+ cfg.setUseSnapshotCompression(snapshotCompressionEnabled);
+ final ThrowingFunction<Collection<OperatorStateHandle>,
OperatorStateBackend>
+ operatorStateBackendFactory =
+ createOperatorStateBackendFactory(
+ cfg, new CloseableRegistry(),
this.getClass().getClassLoader());
+
+ final Map<String, List<String>> listStates = new HashMap<>();
+ final List<String> values =
+ IntStream.range(0, 100).mapToObj(idx -> "v" +
idx).collect(Collectors.toList());
+ listStates.put("s1", values);
+
+ final OperatorStateHandle stateHandle =
+ createOperatorStateHandle(
+ operatorStateBackendFactory, listStates,
Collections.emptyMap());
+
+ final AtomicInteger seekCount = new AtomicInteger();
+ final OperatorStateHandle stateHandleWithCountingSeeks =
+ wrapWithCountingSeekInputStream(stateHandle, seekCount);
+
+ verifyOperatorStateHandle(
+ operatorStateBackendFactory,
+ Collections.singletonList(stateHandleWithCountingSeeks),
+ listStates,
+ Collections.emptyMap());
+
+ assertThat(seekCount.get()).isLessThanOrEqualTo(1);
+ }
+
/**
* This is a simplified version of what RR partitioner does, so it only
works in case there is
* no remainder.
@@ -286,4 +323,107 @@ public class OperatorStateRestoreOperationTest {
}
return newStates;
}
+
+ private static OperatorStateHandle wrapWithCountingSeekInputStream(
+ OperatorStateHandle stateHandle, AtomicInteger seekCount) {
+ final StreamStateHandle delegate =
stateHandle.getDelegateStateHandle();
+ final byte[] data =
+ delegate.asBytesIfInMemory()
+ .orElseThrow(
+ () ->
+ new IllegalStateException(
+ "Expected in-memory state
handle for test."));
+ return new OperatorStreamStateHandle(
+ stateHandle.getStateNameToPartitionOffsets(),
+ new CountingStreamStateHandle(data, seekCount,
delegate.getStreamStateHandleID()));
+ }
+
+ private static final class CountingStreamStateHandle implements
StreamStateHandle {
+
+ private static final long serialVersionUID = 1L;
+
+ private final byte[] data;
+ private final AtomicInteger seekCount;
+ private final PhysicalStateHandleID streamStateHandleId;
+
+ private CountingStreamStateHandle(
+ byte[] data, AtomicInteger seekCount, PhysicalStateHandleID
streamStateHandleId) {
+ this.data = data;
+ this.seekCount = seekCount;
+ this.streamStateHandleId = streamStateHandleId;
+ }
+
+ @Override
+ public FSDataInputStream openInputStream() {
+ return new CountingByteArrayFSDataInputStream(data, seekCount);
+ }
+
+ @Override
+ public Optional<byte[]> asBytesIfInMemory() {
+ return Optional.of(data);
+ }
+
+ @Override
+ public PhysicalStateHandleID getStreamStateHandleID() {
+ return streamStateHandleId;
+ }
+
+ @Override
+ public void discardState() {}
+
+ @Override
+ public long getStateSize() {
+ return data.length;
+ }
+
+ @Override
+ public void collectSizeStats(StateObjectSizeStatsCollector collector) {
+ collector.add(StateObjectLocation.LOCAL_MEMORY, getStateSize());
+ }
+ }
+
+ private static final class CountingByteArrayFSDataInputStream extends
FSDataInputStream {
+
+ private final byte[] data;
+ private final AtomicInteger seekCount;
+ private int index;
+
+ private CountingByteArrayFSDataInputStream(byte[] data, AtomicInteger
seekCount) {
+ this.data = data;
+ this.seekCount = seekCount;
+ }
+
+ @Override
+ public void seek(long desired) throws IOException {
+ seekCount.incrementAndGet();
+ if (desired >= 0 && desired <= data.length) {
+ index = (int) desired;
+ } else {
+ throw new IOException("position out of bounds");
+ }
+ }
+
+ @Override
+ public long getPos() {
+ return index;
+ }
+
+ @Override
+ public int read() {
+ return index < data.length ? data[index++] & 0xFF : -1;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) {
+ final int bytesLeft = data.length - index;
+ if (bytesLeft > 0) {
+ final int bytesToCopy = Math.min(len, bytesLeft);
+ System.arraycopy(data, index, b, off, bytesToCopy);
+ index += bytesToCopy;
+ return bytesToCopy;
+ } else {
+ return len == 0 ? 0 : -1;
+ }
+ }
+ }
}