This is an automated email from the ASF dual-hosted git repository.

tangyun pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.12 by this push:
     new 5f12f4c  [FLINK-23949][checkpoint] Fix first incremental checkpoint 
after a savepoint degenerate into a full checkpoint
5f12f4c is described below

commit 5f12f4ce88a4e83473e778eb30742ab88d92bdb0
Author: wangfeifan <zoltar9...@163.com>
AuthorDate: Mon Sep 6 14:43:52 2021 +0800

    [FLINK-23949][checkpoint] Fix first incremental checkpoint after a 
savepoint degenerate into a full checkpoint
    
    this fix #16969.
    
    Co-authored-by: wangfeifan <zoltar9...@163.com>
    Co-authored-by: jinghaihang <jinghaihang_hr...@163.com>
---
 .../snapshot/RocksIncrementalSnapshotStrategy.java |   6 +-
 .../RocksIncrementalSnapshotStrategyTest.java      | 194 +++++++++++++++++++++
 2 files changed, 199 insertions(+), 1 deletion(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
index 8e01a73..59a9c86 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
@@ -174,7 +174,11 @@ public class RocksIncrementalSnapshotStrategy<K> extends 
RocksDBSnapshotStrategy
     @Override
     public void notifyCheckpointComplete(long completedCheckpointId) {
         synchronized (materializedSstFiles) {
-            if (completedCheckpointId > lastCompletedCheckpointId) {
+            // FLINK-23949: 
materializedSstFiles.keySet().contains(completedCheckpointId) make sure
+            // the notified checkpointId is not a savepoint, otherwise next 
checkpoint will
+            // degenerate into a full checkpoint
+            if (completedCheckpointId > lastCompletedCheckpointId
+                    && 
materializedSstFiles.keySet().contains(completedCheckpointId)) {
                 materializedSstFiles
                         .keySet()
                         .removeIf(checkpointId -> checkpointId < 
completedCheckpointId);
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java
new file mode 100644
index 0000000..69142f8
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.snapshot;
+
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
+import org.apache.flink.contrib.streaming.state.RocksDBOptions;
+import org.apache.flink.contrib.streaming.state.RocksDBResource;
+import org.apache.flink.contrib.streaming.state.RocksDBStateUploader;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.ArrayListSerializer;
+import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
+import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
+import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory;
+import org.apache.flink.util.ResourceGuard;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import static org.apache.flink.core.fs.Path.fromLocalFile;
+import static org.apache.flink.core.fs.local.LocalFileSystem.getSharedInstance;
+
+/** Tests for {@link RocksIncrementalSnapshotStrategy}. */
+public class RocksIncrementalSnapshotStrategyTest {
+
+    @Rule public final TemporaryFolder tmp = new TemporaryFolder();
+
+    @Rule public RocksDBResource rocksDBResource = new RocksDBResource();
+
+    // Verify the next checkpoint is still incremental after a savepoint 
completed.
+    @Test
+    public void testCheckpointIsIncremental() throws Exception {
+
+        try (CloseableRegistry closeableRegistry = new CloseableRegistry();
+                RocksIncrementalSnapshotStrategy checkpointSnapshotStrategy =
+                        createSnapshotStrategy(closeableRegistry)) {
+            FsCheckpointStreamFactory checkpointStreamFactory = 
createFsCheckpointStreamFactory();
+
+            // make and notify checkpoint with id 1
+            snapshot(1L, checkpointSnapshotStrategy, checkpointStreamFactory, 
closeableRegistry);
+            checkpointSnapshotStrategy.notifyCheckpointComplete(1L);
+
+            // notify savepoint with id 2
+            checkpointSnapshotStrategy.notifyCheckpointComplete(2L);
+
+            // make checkpoint with id 3
+            IncrementalRemoteKeyedStateHandle 
incrementalRemoteKeyedStateHandle3 =
+                    snapshot(
+                            3L,
+                            checkpointSnapshotStrategy,
+                            checkpointStreamFactory,
+                            closeableRegistry);
+
+            // If 3rd checkpoint's placeholderStateHandleCount > 0,it means 
3rd checkpoint is
+            // incremental.
+            Map<StateHandleID, StreamStateHandle> sharedState3 =
+                    incrementalRemoteKeyedStateHandle3.getSharedState();
+            long placeholderStateHandleCount =
+                    sharedState3.entrySet().stream()
+                            .filter(e -> e.getValue() instanceof 
PlaceholderStreamStateHandle)
+                            .count();
+
+            Assert.assertTrue(placeholderStateHandleCount > 0);
+        }
+    }
+
+    public RocksIncrementalSnapshotStrategy createSnapshotStrategy(
+            CloseableRegistry closeableRegistry) throws IOException, 
RocksDBException {
+
+        ColumnFamilyHandle columnFamilyHandle = 
rocksDBResource.createNewColumnFamily("test");
+        RocksDB rocksDB = rocksDBResource.getRocksDB();
+        byte[] key = "checkpoint".getBytes();
+        byte[] val = "incrementalTest".getBytes();
+        rocksDB.put(columnFamilyHandle, key, val);
+
+        // construct RocksIncrementalSnapshotStrategy
+        long lastCompletedCheckpointId = -1L;
+        ResourceGuard rocksDBResourceGuard = new ResourceGuard();
+        SortedMap<Long, Set<StateHandleID>> materializedSstFiles = new 
TreeMap<>();
+        LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> 
kvStateInformation =
+                new LinkedHashMap<>();
+
+        RocksDBStateUploader rocksDBStateUploader =
+                new RocksDBStateUploader(
+                        
RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM.defaultValue());
+
+        int keyGroupPrefixBytes =
+                
CompositeKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(2);
+
+        RegisteredKeyValueStateBackendMetaInfo<Integer, ArrayList<Integer>> 
metaInfo =
+                new RegisteredKeyValueStateBackendMetaInfo<>(
+                        StateDescriptor.Type.VALUE,
+                        "test",
+                        IntSerializer.INSTANCE,
+                        new ArrayListSerializer<>(IntSerializer.INSTANCE));
+
+        RocksDBKeyedStateBackend.RocksDbKvStateInfo rocksDbKvStateInfo =
+                new 
RocksDBKeyedStateBackend.RocksDbKvStateInfo(columnFamilyHandle, metaInfo);
+        kvStateInformation.putIfAbsent("test", rocksDbKvStateInfo);
+
+        RocksIncrementalSnapshotStrategy checkpointSnapshotStrategy =
+                new RocksIncrementalSnapshotStrategy(
+                        rocksDB,
+                        rocksDBResourceGuard,
+                        IntSerializer.INSTANCE,
+                        kvStateInformation,
+                        new KeyGroupRange(0, 1),
+                        keyGroupPrefixBytes,
+                        TestLocalRecoveryConfig.disabled(),
+                        closeableRegistry,
+                        tmp.newFolder(),
+                        UUID.randomUUID(),
+                        materializedSstFiles,
+                        rocksDBStateUploader,
+                        lastCompletedCheckpointId);
+
+        return checkpointSnapshotStrategy;
+    }
+
+    public FsCheckpointStreamFactory createFsCheckpointStreamFactory() throws 
IOException {
+        int threshold = 100;
+        File checkpointsDir = tmp.newFolder("checkpointsDir");
+        File sharedStateDir = tmp.newFolder("sharedStateDir");
+        FsCheckpointStreamFactory checkpointStreamFactory =
+                new FsCheckpointStreamFactory(
+                        getSharedInstance(),
+                        fromLocalFile(checkpointsDir),
+                        fromLocalFile(sharedStateDir),
+                        threshold,
+                        threshold);
+        return checkpointStreamFactory;
+    }
+
+    public IncrementalRemoteKeyedStateHandle snapshot(
+            long checkpointId,
+            RocksIncrementalSnapshotStrategy checkpointSnapshotStrategy,
+            FsCheckpointStreamFactory checkpointStreamFactory,
+            CloseableRegistry closeableRegistry)
+            throws Exception {
+
+        RocksIncrementalSnapshotStrategy.IncrementalRocksDBSnapshotResources 
snapshotResources =
+                checkpointSnapshotStrategy.syncPrepareResources(checkpointId);
+
+        return (IncrementalRemoteKeyedStateHandle)
+                checkpointSnapshotStrategy
+                        .asyncSnapshot(
+                                snapshotResources,
+                                checkpointId,
+                                checkpointId,
+                                checkpointStreamFactory,
+                                
CheckpointOptions.forCheckpointWithDefaultLocation())
+                        .get(closeableRegistry)
+                        .getJobManagerOwnedSnapshot();
+    }
+}

Reply via email to