This is an automated email from the ASF dual-hosted git repository.

tangyun pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.12 by this push:
     new afcba03  Revert "[FLINK-23949][checkpoint] Fix first incremental 
checkpoint after a savepoint degenerate into a full checkpoint"
afcba03 is described below

commit afcba034a4e7f92ce208938f9279f289dcfe0aa5
Author: Yun Tang <tang...@apache.org>
AuthorDate: Wed Sep 8 16:21:48 2021 +0800

    Revert "[FLINK-23949][checkpoint] Fix first incremental checkpoint after a 
savepoint degenerate into a full checkpoint"
    
    This reverts commit 5f12f4ce88a4e83473e778eb30742ab88d92bdb0.
---
 .../snapshot/RocksIncrementalSnapshotStrategy.java |   6 +-
 .../RocksIncrementalSnapshotStrategyTest.java      | 194 ---------------------
 2 files changed, 1 insertion(+), 199 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
index 59a9c86..8e01a73 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
@@ -174,11 +174,7 @@ public class RocksIncrementalSnapshotStrategy<K> extends 
RocksDBSnapshotStrategy
     @Override
     public void notifyCheckpointComplete(long completedCheckpointId) {
         synchronized (materializedSstFiles) {
-            // FLINK-23949: 
materializedSstFiles.keySet().contains(completedCheckpointId) make sure
-            // the notified checkpointId is not a savepoint, otherwise next 
checkpoint will
-            // degenerate into a full checkpoint
-            if (completedCheckpointId > lastCompletedCheckpointId
-                    && 
materializedSstFiles.keySet().contains(completedCheckpointId)) {
+            if (completedCheckpointId > lastCompletedCheckpointId) {
                 materializedSstFiles
                         .keySet()
                         .removeIf(checkpointId -> checkpointId < 
completedCheckpointId);
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java
deleted file mode 100644
index 69142f8..0000000
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state.snapshot;
-
-import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
-import org.apache.flink.contrib.streaming.state.RocksDBOptions;
-import org.apache.flink.contrib.streaming.state.RocksDBResource;
-import org.apache.flink.contrib.streaming.state.RocksDBStateUploader;
-import org.apache.flink.core.fs.CloseableRegistry;
-import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.state.ArrayListSerializer;
-import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
-import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
-import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
-import org.apache.flink.runtime.state.StateHandleID;
-import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
-import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory;
-import org.apache.flink.util.ResourceGuard;
-
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.rocksdb.ColumnFamilyHandle;
-import org.rocksdb.RocksDB;
-import org.rocksdb.RocksDBException;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.UUID;
-
-import static org.apache.flink.core.fs.Path.fromLocalFile;
-import static org.apache.flink.core.fs.local.LocalFileSystem.getSharedInstance;
-
-/** Tests for {@link RocksIncrementalSnapshotStrategy}. */
-public class RocksIncrementalSnapshotStrategyTest {
-
-    @Rule public final TemporaryFolder tmp = new TemporaryFolder();
-
-    @Rule public RocksDBResource rocksDBResource = new RocksDBResource();
-
-    // Verify the next checkpoint is still incremental after a savepoint 
completed.
-    @Test
-    public void testCheckpointIsIncremental() throws Exception {
-
-        try (CloseableRegistry closeableRegistry = new CloseableRegistry();
-                RocksIncrementalSnapshotStrategy checkpointSnapshotStrategy =
-                        createSnapshotStrategy(closeableRegistry)) {
-            FsCheckpointStreamFactory checkpointStreamFactory = 
createFsCheckpointStreamFactory();
-
-            // make and notify checkpoint with id 1
-            snapshot(1L, checkpointSnapshotStrategy, checkpointStreamFactory, 
closeableRegistry);
-            checkpointSnapshotStrategy.notifyCheckpointComplete(1L);
-
-            // notify savepoint with id 2
-            checkpointSnapshotStrategy.notifyCheckpointComplete(2L);
-
-            // make checkpoint with id 3
-            IncrementalRemoteKeyedStateHandle 
incrementalRemoteKeyedStateHandle3 =
-                    snapshot(
-                            3L,
-                            checkpointSnapshotStrategy,
-                            checkpointStreamFactory,
-                            closeableRegistry);
-
-            // If 3rd checkpoint's placeholderStateHandleCount > 0,it means 
3rd checkpoint is
-            // incremental.
-            Map<StateHandleID, StreamStateHandle> sharedState3 =
-                    incrementalRemoteKeyedStateHandle3.getSharedState();
-            long placeholderStateHandleCount =
-                    sharedState3.entrySet().stream()
-                            .filter(e -> e.getValue() instanceof 
PlaceholderStreamStateHandle)
-                            .count();
-
-            Assert.assertTrue(placeholderStateHandleCount > 0);
-        }
-    }
-
-    public RocksIncrementalSnapshotStrategy createSnapshotStrategy(
-            CloseableRegistry closeableRegistry) throws IOException, 
RocksDBException {
-
-        ColumnFamilyHandle columnFamilyHandle = 
rocksDBResource.createNewColumnFamily("test");
-        RocksDB rocksDB = rocksDBResource.getRocksDB();
-        byte[] key = "checkpoint".getBytes();
-        byte[] val = "incrementalTest".getBytes();
-        rocksDB.put(columnFamilyHandle, key, val);
-
-        // construct RocksIncrementalSnapshotStrategy
-        long lastCompletedCheckpointId = -1L;
-        ResourceGuard rocksDBResourceGuard = new ResourceGuard();
-        SortedMap<Long, Set<StateHandleID>> materializedSstFiles = new 
TreeMap<>();
-        LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> 
kvStateInformation =
-                new LinkedHashMap<>();
-
-        RocksDBStateUploader rocksDBStateUploader =
-                new RocksDBStateUploader(
-                        
RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM.defaultValue());
-
-        int keyGroupPrefixBytes =
-                
CompositeKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(2);
-
-        RegisteredKeyValueStateBackendMetaInfo<Integer, ArrayList<Integer>> 
metaInfo =
-                new RegisteredKeyValueStateBackendMetaInfo<>(
-                        StateDescriptor.Type.VALUE,
-                        "test",
-                        IntSerializer.INSTANCE,
-                        new ArrayListSerializer<>(IntSerializer.INSTANCE));
-
-        RocksDBKeyedStateBackend.RocksDbKvStateInfo rocksDbKvStateInfo =
-                new 
RocksDBKeyedStateBackend.RocksDbKvStateInfo(columnFamilyHandle, metaInfo);
-        kvStateInformation.putIfAbsent("test", rocksDbKvStateInfo);
-
-        RocksIncrementalSnapshotStrategy checkpointSnapshotStrategy =
-                new RocksIncrementalSnapshotStrategy(
-                        rocksDB,
-                        rocksDBResourceGuard,
-                        IntSerializer.INSTANCE,
-                        kvStateInformation,
-                        new KeyGroupRange(0, 1),
-                        keyGroupPrefixBytes,
-                        TestLocalRecoveryConfig.disabled(),
-                        closeableRegistry,
-                        tmp.newFolder(),
-                        UUID.randomUUID(),
-                        materializedSstFiles,
-                        rocksDBStateUploader,
-                        lastCompletedCheckpointId);
-
-        return checkpointSnapshotStrategy;
-    }
-
-    public FsCheckpointStreamFactory createFsCheckpointStreamFactory() throws 
IOException {
-        int threshold = 100;
-        File checkpointsDir = tmp.newFolder("checkpointsDir");
-        File sharedStateDir = tmp.newFolder("sharedStateDir");
-        FsCheckpointStreamFactory checkpointStreamFactory =
-                new FsCheckpointStreamFactory(
-                        getSharedInstance(),
-                        fromLocalFile(checkpointsDir),
-                        fromLocalFile(sharedStateDir),
-                        threshold,
-                        threshold);
-        return checkpointStreamFactory;
-    }
-
-    public IncrementalRemoteKeyedStateHandle snapshot(
-            long checkpointId,
-            RocksIncrementalSnapshotStrategy checkpointSnapshotStrategy,
-            FsCheckpointStreamFactory checkpointStreamFactory,
-            CloseableRegistry closeableRegistry)
-            throws Exception {
-
-        RocksIncrementalSnapshotStrategy.IncrementalRocksDBSnapshotResources 
snapshotResources =
-                checkpointSnapshotStrategy.syncPrepareResources(checkpointId);
-
-        return (IncrementalRemoteKeyedStateHandle)
-                checkpointSnapshotStrategy
-                        .asyncSnapshot(
-                                snapshotResources,
-                                checkpointId,
-                                checkpointId,
-                                checkpointStreamFactory,
-                                
CheckpointOptions.forCheckpointWithDefaultLocation())
-                        .get(closeableRegistry)
-                        .getJobManagerOwnedSnapshot();
-    }
-}

Reply via email to