This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 80066185648a243853b532350393ce93952f49b3
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Mon Feb 8 18:56:27 2021 +0100

    [FLINK-21344] Do not store heap timers in raw operator state for a
    savepoint
    
    We do no longer serialize the heap timers in RocksDB state backend when
    taking a savepoint. We still do it for checkpoints though.
    
    There is one gotcha in the PR, that the 
StateConfigUtil#isStateImmutableInStateBackend
    assumes the knowledge that checkpoints behave differently for heap
    timers than savepoints.
    
    This closes #14913
---
 .../runtime/state/AbstractKeyedStateBackend.java   |   3 +-
 .../state/ttl/mock/MockKeyedStateBackend.java      |   3 +-
 .../streaming/state/RocksDBKeyedStateBackend.java  |   6 +-
 .../state/HeapTimersSnapshottingTest.java          | 103 +++++++++++++++++++++
 .../contrib/streaming/state/RocksDBTestUtils.java  |  11 ++-
 .../api/operators/InternalTimeServiceManager.java  |  12 +--
 .../operators/InternalTimeServiceManagerImpl.java  |  25 +----
 .../api/operators/StreamOperatorStateHandler.java  |   9 +-
 .../BatchExecutionInternalTimeServiceManager.java  |   5 -
 .../util/AbstractStreamOperatorTestHarness.java    |  16 +++-
 .../flink/table/runtime/util/StateConfigUtil.java  |   3 +-
 11 files changed, 148 insertions(+), 48 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 1ded0dc..6ba970a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.heap.InternalKeyContext;
 import org.apache.flink.runtime.state.internal.InternalKvState;
@@ -348,7 +349,7 @@ public abstract class AbstractKeyedStateBackend<K>
     }
 
     // TODO remove this once heap-based timers are working with RocksDB 
incremental snapshots!
-    public boolean requiresLegacySynchronousTimerSnapshots() {
+    public boolean requiresLegacySynchronousTimerSnapshots(CheckpointType 
checkpointOptions) {
         return false;
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
index c946365..d995ba3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
@@ -181,7 +182,7 @@ public class MockKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
     }
 
     @Override
-    public boolean requiresLegacySynchronousTimerSnapshots() {
+    public boolean requiresLegacySynchronousTimerSnapshots(CheckpointType 
checkpointOptions) {
         return false;
     }
 
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 0f53955..9dbc5a6 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -40,6 +40,7 @@ import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
@@ -853,8 +854,9 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
     }
 
     @Override
-    public boolean requiresLegacySynchronousTimerSnapshots() {
-        return priorityQueueFactory instanceof HeapPriorityQueueSetFactory;
+    public boolean requiresLegacySynchronousTimerSnapshots(CheckpointType 
checkpointType) {
+        return priorityQueueFactory instanceof HeapPriorityQueueSetFactory
+                && checkpointType == CheckpointType.CHECKPOINT;
     }
 
     /** Rocks DB specific information about the k/v states. */
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/HeapTimersSnapshottingTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/HeapTimersSnapshottingTest.java
new file mode 100644
index 0000000..94d82fd
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/HeapTimersSnapshottingTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.PriorityQueueStateType;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.util.Collector;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * The tests verify that {@link PriorityQueueStateType#HEAP heap timers} are 
not serialized into raw
+ * keyed operator state when taking a savepoint, but they are serialized for 
checkpoints. The heap
+ * timers still need to be serialized into the raw operator state because of 
RocksDB incremental
+ * checkpoints.
+ */
+public class HeapTimersSnapshottingTest {
+
+    @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+    @Test
+    public void testNotSerializingTimersInRawStateForSavepoints() throws 
Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> 
testHarness =
+                getTestHarness()) {
+            RocksDBStateBackend backend =
+                    new 
RocksDBStateBackend(temporaryFolder.newFolder().toURI());
+            backend.setPriorityQueueStateType(PriorityQueueStateType.HEAP);
+            testHarness.setStateBackend(backend);
+            testHarness.open();
+            testHarness.processElement(0, 0L);
+
+            OperatorSubtaskState state =
+                    testHarness
+                            .snapshotWithLocalState(0L, 1L, 
CheckpointType.SAVEPOINT)
+                            .getJobManagerOwnedState();
+            assertThat(state.getRawKeyedState().isEmpty(), equalTo(true));
+        }
+    }
+
+    @Test
+    public void testSerializingTimersInRawStateForCheckpoints() throws 
Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> 
testHarness =
+                getTestHarness()) {
+            RocksDBStateBackend backend =
+                    new 
RocksDBStateBackend(temporaryFolder.newFolder().toURI());
+            backend.setPriorityQueueStateType(PriorityQueueStateType.HEAP);
+            testHarness.setStateBackend(backend);
+            testHarness.open();
+            testHarness.processElement(0, 0L);
+
+            OperatorSubtaskState state =
+                    testHarness
+                            .snapshotWithLocalState(0L, 1L, 
CheckpointType.CHECKPOINT)
+                            .getJobManagerOwnedState();
+            assertThat(state.getRawKeyedState().isEmpty(), equalTo(false));
+        }
+    }
+
+    private KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> 
getTestHarness()
+            throws Exception {
+        return new KeyedOneInputStreamOperatorTestHarness<>(
+                new KeyedProcessOperator<>(
+                        new KeyedProcessFunction<Integer, Integer, Integer>() {
+                            @Override
+                            public void processElement(
+                                    Integer value, Context ctx, 
Collector<Integer> out) {
+                                ctx.timerService().registerEventTimeTimer(0L);
+                            }
+                        }),
+                (KeySelector<Integer, Integer>) value -> value,
+                BasicTypeInfo.INT_TYPE_INFO,
+                1,
+                1,
+                0);
+    }
+}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java
index db7dd71..7702f55 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java
@@ -45,6 +45,15 @@ public final class RocksDBTestUtils {
     public static <K> RocksDBKeyedStateBackendBuilder<K> 
builderForTestDefaults(
             File instanceBasePath, TypeSerializer<K> keySerializer) {
 
+        return builderForTestDefaults(
+                instanceBasePath, keySerializer, 
RocksDBStateBackend.PriorityQueueStateType.HEAP);
+    }
+
+    public static <K> RocksDBKeyedStateBackendBuilder<K> 
builderForTestDefaults(
+            File instanceBasePath,
+            TypeSerializer<K> keySerializer,
+            RocksDBStateBackend.PriorityQueueStateType queueStateType) {
+
         final RocksDBResourceContainer optionsContainer = new 
RocksDBResourceContainer();
 
         return new RocksDBKeyedStateBackendBuilder<>(
@@ -59,7 +68,7 @@ public final class RocksDBTestUtils {
                 new KeyGroupRange(0, 1),
                 new ExecutionConfig(),
                 TestLocalRecoveryConfig.disabled(),
-                RocksDBStateBackend.PriorityQueueStateType.HEAP,
+                queueStateType,
                 TtlTimeProvider.DEFAULT,
                 new UnregisteredMetricsGroup(),
                 Collections.emptyList(),
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
index 6f89b02..e374fdb 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
@@ -56,8 +56,7 @@ public interface InternalTimeServiceManager<K> {
     void advanceWatermark(Watermark watermark) throws Exception;
 
     /**
-     * Snapshots the timers to raw keyed state. This should only be called iff 
{@link
-     * #isUsingLegacyRawKeyedStateSnapshots()} returns {@code true}.
+     * Snapshots the timers to raw keyed state.
      *
      * <p><b>TODO:</b> This can be removed once heap-based timers are 
integrated with RocksDB
      * incremental snapshots.
@@ -67,15 +66,6 @@ public interface InternalTimeServiceManager<K> {
             throws Exception;
 
     /**
-     * Flag indicating whether or not the internal timer services should be 
checkpointed with legacy
-     * synchronous snapshots.
-     *
-     * <p><b>TODO:</b> This can be removed once heap-based timers are 
integrated with RocksDB
-     * incremental snapshots.
-     */
-    boolean isUsingLegacyRawKeyedStateSnapshots();
-
-    /**
      * A provider pattern for creating an instance of a {@link 
InternalTimeServiceManager}. Allows
      * substituting the manager that will be used at the runtime.
      */
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java
index 14e9d4f..9288e94 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java
@@ -23,7 +23,6 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
@@ -45,7 +44,6 @@ import java.util.HashMap;
 import java.util.Map;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * An entity keeping all the time-related services. Right now, this is only a 
{@link
@@ -75,20 +73,16 @@ public class InternalTimeServiceManagerImpl<K> implements 
InternalTimeServiceMan
 
     private final Map<String, InternalTimerServiceImpl<K, ?>> timerServices;
 
-    private final boolean useLegacySynchronousSnapshots;
-
     private InternalTimeServiceManagerImpl(
             KeyGroupRange localKeyGroupRange,
             KeyContext keyContext,
             PriorityQueueSetFactory priorityQueueSetFactory,
-            ProcessingTimeService processingTimeService,
-            boolean useLegacySynchronousSnapshots) {
+            ProcessingTimeService processingTimeService) {
 
         this.localKeyGroupRange = 
Preconditions.checkNotNull(localKeyGroupRange);
         this.priorityQueueSetFactory = 
Preconditions.checkNotNull(priorityQueueSetFactory);
         this.keyContext = Preconditions.checkNotNull(keyContext);
         this.processingTimeService = 
Preconditions.checkNotNull(processingTimeService);
-        this.useLegacySynchronousSnapshots = useLegacySynchronousSnapshots;
 
         this.timerServices = new HashMap<>();
     }
@@ -106,18 +100,10 @@ public class InternalTimeServiceManagerImpl<K> implements 
InternalTimeServiceMan
             Iterable<KeyGroupStatePartitionStreamProvider> rawKeyedStates)
             throws Exception {
         final KeyGroupRange keyGroupRange = 
keyedStatedBackend.getKeyGroupRange();
-        final boolean requiresSnapshotLegacyTimers =
-                keyedStatedBackend instanceof AbstractKeyedStateBackend
-                        && ((AbstractKeyedStateBackend<K>) keyedStatedBackend)
-                                .requiresLegacySynchronousTimerSnapshots();
 
         final InternalTimeServiceManagerImpl<K> timeServiceManager =
                 new InternalTimeServiceManagerImpl<>(
-                        keyGroupRange,
-                        keyContext,
-                        keyedStatedBackend,
-                        processingTimeService,
-                        requiresSnapshotLegacyTimers);
+                        keyGroupRange, keyContext, keyedStatedBackend, 
processingTimeService);
 
         // and then initialize the timer services
         for (KeyGroupStatePartitionStreamProvider streamProvider : 
rawKeyedStates) {
@@ -198,15 +184,8 @@ public class InternalTimeServiceManagerImpl<K> implements 
InternalTimeServiceMan
     //////////////////                         Fault Tolerance Methods         
                ///////////////////
 
     @Override
-    public boolean isUsingLegacyRawKeyedStateSnapshots() {
-        return useLegacySynchronousSnapshots;
-    }
-
-    @Override
     public void snapshotToRawKeyedState(KeyedStateCheckpointOutputStream out, 
String operatorName)
             throws Exception {
-        checkState(useLegacySynchronousSnapshots);
-
         try {
             KeyGroupsList allKeyGroups = out.getKeyGroupList();
             for (int keyGroupIdx : allKeyGroups) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
index b784382..f45b00c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
@@ -30,6 +30,7 @@ import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
 import org.apache.flink.runtime.state.DefaultKeyedStateStore;
@@ -194,7 +195,13 @@ public class StreamOperatorStateHandler {
                         "keyedStateBackend should be available with 
timeServiceManager");
                 final InternalTimeServiceManager<?> manager = 
timeServiceManager.get();
 
-                if (manager.isUsingLegacyRawKeyedStateSnapshots()) {
+                boolean requiresLegacyRawKeyedStateSnapshots =
+                        keyedStateBackend instanceof AbstractKeyedStateBackend
+                                && ((AbstractKeyedStateBackend<?>) 
keyedStateBackend)
+                                        
.requiresLegacySynchronousTimerSnapshots(
+                                                
checkpointOptions.getCheckpointType());
+
+                if (requiresLegacyRawKeyedStateSnapshots) {
                     checkState(
                             !isUsingCustomRawKeyedState,
                             "Attempting to snapshot timers to raw keyed state, 
but this operator has custom raw keyed state to write.");
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceManager.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceManager.java
index 6338fd4..8666f05 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceManager.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceManager.java
@@ -83,11 +83,6 @@ public class BatchExecutionInternalTimeServiceManager<K>
         throw new UnsupportedOperationException("Checkpoints are not supported 
in BATCH execution");
     }
 
-    @Override
-    public boolean isUsingLegacyRawKeyedStateSnapshots() {
-        throw new UnsupportedOperationException("Checkpoints are not supported 
in BATCH execution");
-    }
-
     public static <K> InternalTimeServiceManager<K> create(
             CheckpointableKeyedStateBackend<K> keyedStatedBackend,
             ClassLoader userClassloader,
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index d82ed51..be0e6b3 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.ManagedMemoryUseCase;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.checkpoint.OperatorStateRepartitioner;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import 
org.apache.flink.runtime.checkpoint.RoundRobinOperatorStateRepartitioner;
@@ -654,14 +655,25 @@ public class AbstractStreamOperatorTestHarness<OUT> 
implements AutoCloseable {
      */
     public OperatorSnapshotFinalizer snapshotWithLocalState(long checkpointId, 
long timestamp)
             throws Exception {
+        return snapshotWithLocalState(checkpointId, timestamp, 
CheckpointType.CHECKPOINT);
+    }
+
+    /**
+     * Calls {@link StreamOperator#snapshotState(long, long, CheckpointOptions,
+     * org.apache.flink.runtime.state.CheckpointStreamFactory)}.
+     */
+    public OperatorSnapshotFinalizer snapshotWithLocalState(
+            long checkpointId, long timestamp, CheckpointType checkpointType) 
throws Exception {
 
+        CheckpointStorageLocationReference locationReference =
+                CheckpointStorageLocationReference.getDefault();
         OperatorSnapshotFutures operatorStateResult =
                 operator.snapshotState(
                         checkpointId,
                         timestamp,
-                        CheckpointOptions.forCheckpointWithDefaultLocation(),
+                        new CheckpointOptions(checkpointType, 
locationReference),
                         
checkpointStorageAccess.resolveCheckpointStorageLocation(
-                                checkpointId, 
CheckpointStorageLocationReference.getDefault()));
+                                checkpointId, locationReference));
 
         return new OperatorSnapshotFinalizer(operatorStateResult);
     }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StateConfigUtil.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StateConfigUtil.java
index 8be9d55..79e9a63 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StateConfigUtil.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StateConfigUtil.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.runtime.util;
 
 import org.apache.flink.api.common.state.StateTtlConfig;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 
@@ -56,7 +57,7 @@ public class StateConfigUtil {
             // indicates the underlying uses heap-bsased timer
             isHeapTimer =
                     ((AbstractKeyedStateBackend<?>) stateBackend)
-                            .requiresLegacySynchronousTimerSnapshots();
+                            
.requiresLegacySynchronousTimerSnapshots(CheckpointType.CHECKPOINT);
         }
         return isRocksDbState && !isHeapTimer;
     }

Reply via email to