This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.18 by this push: new f9383e6780a [FLINK-33863] Fix restoring compressed operator state f9383e6780a is described below commit f9383e6780ae8beb995d9bbd58a8484d19900f55 Author: Ruibin Xing <xingrui...@kanyun.com> AuthorDate: Sat Dec 16 14:14:22 2023 +0800 [FLINK-33863] Fix restoring compressed operator state --- .../state/OperatorStateRestoreOperation.java | 30 +++++- .../state/OperatorStateRestoreOperationTest.java | 114 +++++++++++++++++++++ 2 files changed, 142 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java index f818eb81978..fd983fd5d28 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java @@ -32,9 +32,12 @@ import org.apache.commons.io.IOUtils; import javax.annotation.Nonnull; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; +import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** Implementation of operator state restore operation. */ public class OperatorStateRestoreOperation implements RestoreOperation<Void> { @@ -168,9 +171,32 @@ public class OperatorStateRestoreOperation implements RestoreOperation<Void> { } } + List<Map.Entry<String, OperatorStateHandle.StateMetaInfo>> entries = + new ArrayList<>(stateHandle.getStateNameToPartitionOffsets().entrySet()); + + if (backendSerializationProxy.isUsingStateCompression()) { + // sort state handles by offsets to avoid building SnappyFramedInputStream with + // EOF stream. + entries = + entries.stream() + .sorted( + Comparator.comparingLong( + entry -> { + OperatorStateHandle.StateMetaInfo + stateMetaInfo = entry.getValue(); + long[] offsets = stateMetaInfo.getOffsets(); + if (offsets == null + || offsets.length == 0) { + return Long.MIN_VALUE; + } else { + return offsets[0]; + } + })) + .collect(Collectors.toList()); + } + // Restore all the states - for (Map.Entry<String, OperatorStateHandle.StateMetaInfo> nameToOffsets : - stateHandle.getStateNameToPartitionOffsets().entrySet()) { + for (Map.Entry<String, OperatorStateHandle.StateMetaInfo> nameToOffsets : entries) { final String stateName = nameToOffsets.getKey(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateRestoreOperationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateRestoreOperationTest.java new file mode 100644 index 00000000000..e0aecd5d723 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateRestoreOperationTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.BroadcastState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory; + +import org.junit.jupiter.api.Test; + +import javax.annotation.Nullable; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; + +/** Tests for the {@link org.apache.flink.runtime.state.OperatorStateRestoreOperation}. */ +public class OperatorStateRestoreOperationTest { + + @Nullable + private static OperatorStateHandle createOperatorStateHandle( + ExecutionConfig cfg, + CloseableRegistry cancelStreamRegistry, + ClassLoader classLoader, + List<String> stateNames, + List<String> broadcastStateNames) + throws Exception { + + try (OperatorStateBackend operatorStateBackend = + new DefaultOperatorStateBackendBuilder( + classLoader, + cfg, + false, + Collections.emptyList(), + cancelStreamRegistry) + .build()) { + CheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(4096); + + for (String stateName : stateNames) { + ListStateDescriptor<String> descriptor = + new ListStateDescriptor<>(stateName, String.class); + PartitionableListState<String> state = + (PartitionableListState<String>) + operatorStateBackend.getListState(descriptor); + state.add("value1"); + } + + for (String broadcastStateName : broadcastStateNames) { + MapStateDescriptor<String, String> descriptor = + new MapStateDescriptor<>(broadcastStateName, String.class, String.class); + BroadcastState<String, String> state = + operatorStateBackend.getBroadcastState(descriptor); + state.put("key1", "value1"); + } + + SnapshotResult<OperatorStateHandle> result = + operatorStateBackend + .snapshot( + 1, + 1, + streamFactory, + CheckpointOptions.forCheckpointWithDefaultLocation()) + .get(); + return result.getJobManagerOwnedSnapshot(); + } + } + + @Test + public void testRestoringMixedOperatorStateWhenSnapshotCompressionIsEnabled() throws Exception { + ExecutionConfig cfg = new ExecutionConfig(); + cfg.setUseSnapshotCompression(true); + CloseableRegistry cancelStreamRegistry = new CloseableRegistry(); + ClassLoader classLoader = this.getClass().getClassLoader(); + + OperatorStateHandle handle = + createOperatorStateHandle( + cfg, + cancelStreamRegistry, + classLoader, + Arrays.asList("s1", "s2"), + Collections.singletonList("b2")); + + OperatorStateRestoreOperation operatorStateRestoreOperation = + new OperatorStateRestoreOperation( + cancelStreamRegistry, + classLoader, + new HashMap<>(), + new HashMap<>(), + Collections.singletonList(handle)); + + operatorStateRestoreOperation.restore(); + } +}