This is an automated email from the ASF dual-hosted git repository. tangyun pushed a commit to branch release-1.12 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.12 by this push: new 5f12f4c [FLINK-23949][checkpoint] Fix first incremental checkpoint after a savepoint degenerate into a full checkpoint 5f12f4c is described below commit 5f12f4ce88a4e83473e778eb30742ab88d92bdb0 Author: wangfeifan <zoltar9...@163.com> AuthorDate: Mon Sep 6 14:43:52 2021 +0800 [FLINK-23949][checkpoint] Fix first incremental checkpoint after a savepoint degenerate into a full checkpoint this fix #16969. Co-authored-by: wangfeifan <zoltar9...@163.com> Co-authored-by: jinghaihang <jinghaihang_hr...@163.com> --- .../snapshot/RocksIncrementalSnapshotStrategy.java | 6 +- .../RocksIncrementalSnapshotStrategyTest.java | 194 +++++++++++++++++++++ 2 files changed, 199 insertions(+), 1 deletion(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java index 8e01a73..59a9c86 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java @@ -174,7 +174,11 @@ public class RocksIncrementalSnapshotStrategy<K> extends RocksDBSnapshotStrategy @Override public void notifyCheckpointComplete(long completedCheckpointId) { synchronized (materializedSstFiles) { - if (completedCheckpointId > lastCompletedCheckpointId) { + // FLINK-23949: materializedSstFiles.keySet().contains(completedCheckpointId) make sure + // the notified checkpointId is not a savepoint, otherwise next checkpoint will + // degenerate into a full checkpoint + if (completedCheckpointId > lastCompletedCheckpointId + && materializedSstFiles.keySet().contains(completedCheckpointId)) { materializedSstFiles .keySet() .removeIf(checkpointId -> checkpointId < completedCheckpointId); diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java new file mode 100644 index 0000000..69142f8 --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state.snapshot; + +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend; +import org.apache.flink.contrib.streaming.state.RocksDBOptions; +import org.apache.flink.contrib.streaming.state.RocksDBResource; +import org.apache.flink.contrib.streaming.state.RocksDBStateUploader; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.state.ArrayListSerializer; +import org.apache.flink.runtime.state.CompositeKeySerializationUtils; +import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.PlaceholderStreamStateHandle; +import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.TestLocalRecoveryConfig; +import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory; +import org.apache.flink.util.ResourceGuard; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.UUID; + +import static org.apache.flink.core.fs.Path.fromLocalFile; +import static org.apache.flink.core.fs.local.LocalFileSystem.getSharedInstance; + +/** Tests for {@link RocksIncrementalSnapshotStrategy}. */ +public class RocksIncrementalSnapshotStrategyTest { + + @Rule public final TemporaryFolder tmp = new TemporaryFolder(); + + @Rule public RocksDBResource rocksDBResource = new RocksDBResource(); + + // Verify the next checkpoint is still incremental after a savepoint completed. + @Test + public void testCheckpointIsIncremental() throws Exception { + + try (CloseableRegistry closeableRegistry = new CloseableRegistry(); + RocksIncrementalSnapshotStrategy checkpointSnapshotStrategy = + createSnapshotStrategy(closeableRegistry)) { + FsCheckpointStreamFactory checkpointStreamFactory = createFsCheckpointStreamFactory(); + + // make and notify checkpoint with id 1 + snapshot(1L, checkpointSnapshotStrategy, checkpointStreamFactory, closeableRegistry); + checkpointSnapshotStrategy.notifyCheckpointComplete(1L); + + // notify savepoint with id 2 + checkpointSnapshotStrategy.notifyCheckpointComplete(2L); + + // make checkpoint with id 3 + IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle3 = + snapshot( + 3L, + checkpointSnapshotStrategy, + checkpointStreamFactory, + closeableRegistry); + + // If 3rd checkpoint's placeholderStateHandleCount > 0,it means 3rd checkpoint is + // incremental. + Map<StateHandleID, StreamStateHandle> sharedState3 = + incrementalRemoteKeyedStateHandle3.getSharedState(); + long placeholderStateHandleCount = + sharedState3.entrySet().stream() + .filter(e -> e.getValue() instanceof PlaceholderStreamStateHandle) + .count(); + + Assert.assertTrue(placeholderStateHandleCount > 0); + } + } + + public RocksIncrementalSnapshotStrategy createSnapshotStrategy( + CloseableRegistry closeableRegistry) throws IOException, RocksDBException { + + ColumnFamilyHandle columnFamilyHandle = rocksDBResource.createNewColumnFamily("test"); + RocksDB rocksDB = rocksDBResource.getRocksDB(); + byte[] key = "checkpoint".getBytes(); + byte[] val = "incrementalTest".getBytes(); + rocksDB.put(columnFamilyHandle, key, val); + + // construct RocksIncrementalSnapshotStrategy + long lastCompletedCheckpointId = -1L; + ResourceGuard rocksDBResourceGuard = new ResourceGuard(); + SortedMap<Long, Set<StateHandleID>> materializedSstFiles = new TreeMap<>(); + LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation = + new LinkedHashMap<>(); + + RocksDBStateUploader rocksDBStateUploader = + new RocksDBStateUploader( + RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM.defaultValue()); + + int keyGroupPrefixBytes = + CompositeKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(2); + + RegisteredKeyValueStateBackendMetaInfo<Integer, ArrayList<Integer>> metaInfo = + new RegisteredKeyValueStateBackendMetaInfo<>( + StateDescriptor.Type.VALUE, + "test", + IntSerializer.INSTANCE, + new ArrayListSerializer<>(IntSerializer.INSTANCE)); + + RocksDBKeyedStateBackend.RocksDbKvStateInfo rocksDbKvStateInfo = + new RocksDBKeyedStateBackend.RocksDbKvStateInfo(columnFamilyHandle, metaInfo); + kvStateInformation.putIfAbsent("test", rocksDbKvStateInfo); + + RocksIncrementalSnapshotStrategy checkpointSnapshotStrategy = + new RocksIncrementalSnapshotStrategy( + rocksDB, + rocksDBResourceGuard, + IntSerializer.INSTANCE, + kvStateInformation, + new KeyGroupRange(0, 1), + keyGroupPrefixBytes, + TestLocalRecoveryConfig.disabled(), + closeableRegistry, + tmp.newFolder(), + UUID.randomUUID(), + materializedSstFiles, + rocksDBStateUploader, + lastCompletedCheckpointId); + + return checkpointSnapshotStrategy; + } + + public FsCheckpointStreamFactory createFsCheckpointStreamFactory() throws IOException { + int threshold = 100; + File checkpointsDir = tmp.newFolder("checkpointsDir"); + File sharedStateDir = tmp.newFolder("sharedStateDir"); + FsCheckpointStreamFactory checkpointStreamFactory = + new FsCheckpointStreamFactory( + getSharedInstance(), + fromLocalFile(checkpointsDir), + fromLocalFile(sharedStateDir), + threshold, + threshold); + return checkpointStreamFactory; + } + + public IncrementalRemoteKeyedStateHandle snapshot( + long checkpointId, + RocksIncrementalSnapshotStrategy checkpointSnapshotStrategy, + FsCheckpointStreamFactory checkpointStreamFactory, + CloseableRegistry closeableRegistry) + throws Exception { + + RocksIncrementalSnapshotStrategy.IncrementalRocksDBSnapshotResources snapshotResources = + checkpointSnapshotStrategy.syncPrepareResources(checkpointId); + + return (IncrementalRemoteKeyedStateHandle) + checkpointSnapshotStrategy + .asyncSnapshot( + snapshotResources, + checkpointId, + checkpointId, + checkpointStreamFactory, + CheckpointOptions.forCheckpointWithDefaultLocation()) + .get(closeableRegistry) + .getJobManagerOwnedSnapshot(); + } +}