This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 80066185648a243853b532350393ce93952f49b3 Author: Dawid Wysakowicz <dwysakow...@apache.org> AuthorDate: Mon Feb 8 18:56:27 2021 +0100 [FLINK-21344] Do not store heap timers in raw operator state for a savepoint We do no longer serialize the heap timers in RocksDB state backend when taking a savepoint. We still do it for checkpoints though. There is one gotcha in the PR, that the StateConfigUtil#isStateImmutableInStateBackend assumes the knowledge that checkpoints behave differently for heap timers than savepoints. This closes #14913 --- .../runtime/state/AbstractKeyedStateBackend.java | 3 +- .../state/ttl/mock/MockKeyedStateBackend.java | 3 +- .../streaming/state/RocksDBKeyedStateBackend.java | 6 +- .../state/HeapTimersSnapshottingTest.java | 103 +++++++++++++++++++++ .../contrib/streaming/state/RocksDBTestUtils.java | 11 ++- .../api/operators/InternalTimeServiceManager.java | 12 +-- .../operators/InternalTimeServiceManagerImpl.java | 25 +---- .../api/operators/StreamOperatorStateHandler.java | 9 +- .../BatchExecutionInternalTimeServiceManager.java | 5 - .../util/AbstractStreamOperatorTestHarness.java | 16 +++- .../flink/table/runtime/util/StateConfigUtil.java | 3 +- 11 files changed, 148 insertions(+), 48 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java index 1ded0dc..6ba970a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java @@ -26,6 +26,7 @@ import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointType; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.heap.InternalKeyContext; import org.apache.flink.runtime.state.internal.InternalKvState; @@ -348,7 +349,7 @@ public abstract class AbstractKeyedStateBackend<K> } // TODO remove this once heap-based timers are working with RocksDB incremental snapshots! - public boolean requiresLegacySynchronousTimerSnapshots() { + public boolean requiresLegacySynchronousTimerSnapshots(CheckpointType checkpointOptions) { return false; } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java index c946365..d995ba3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java @@ -30,6 +30,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointType; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; @@ -181,7 +182,7 @@ public class MockKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } @Override - public boolean requiresLegacySynchronousTimerSnapshots() { + public boolean requiresLegacySynchronousTimerSnapshots(CheckpointType checkpointOptions) { return false; } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 0f53955..9dbc5a6 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -40,6 +40,7 @@ import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointType; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; @@ -853,8 +854,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } @Override - public boolean requiresLegacySynchronousTimerSnapshots() { - return priorityQueueFactory instanceof HeapPriorityQueueSetFactory; + public boolean requiresLegacySynchronousTimerSnapshots(CheckpointType checkpointType) { + return priorityQueueFactory instanceof HeapPriorityQueueSetFactory + && checkpointType == CheckpointType.CHECKPOINT; } /** Rocks DB specific information about the k/v states. */ diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/HeapTimersSnapshottingTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/HeapTimersSnapshottingTest.java new file mode 100644 index 0000000..94d82fd --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/HeapTimersSnapshottingTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend.PriorityQueueStateType; +import org.apache.flink.runtime.checkpoint.CheckpointType; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.streaming.api.operators.KeyedProcessOperator; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.util.Collector; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertThat; + +/** + * The tests verify that {@link PriorityQueueStateType#HEAP heap timers} are not serialized into raw + * keyed operator state when taking a savepoint, but they are serialized for checkpoints. The heap + * timers still need to be serialized into the raw operator state because of RocksDB incremental + * checkpoints. + */ +public class HeapTimersSnapshottingTest { + + @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Test + public void testNotSerializingTimersInRawStateForSavepoints() throws Exception { + try (KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness = + getTestHarness()) { + RocksDBStateBackend backend = + new RocksDBStateBackend(temporaryFolder.newFolder().toURI()); + backend.setPriorityQueueStateType(PriorityQueueStateType.HEAP); + testHarness.setStateBackend(backend); + testHarness.open(); + testHarness.processElement(0, 0L); + + OperatorSubtaskState state = + testHarness + .snapshotWithLocalState(0L, 1L, CheckpointType.SAVEPOINT) + .getJobManagerOwnedState(); + assertThat(state.getRawKeyedState().isEmpty(), equalTo(true)); + } + } + + @Test + public void testSerializingTimersInRawStateForCheckpoints() throws Exception { + try (KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness = + getTestHarness()) { + RocksDBStateBackend backend = + new RocksDBStateBackend(temporaryFolder.newFolder().toURI()); + backend.setPriorityQueueStateType(PriorityQueueStateType.HEAP); + testHarness.setStateBackend(backend); + testHarness.open(); + testHarness.processElement(0, 0L); + + OperatorSubtaskState state = + testHarness + .snapshotWithLocalState(0L, 1L, CheckpointType.CHECKPOINT) + .getJobManagerOwnedState(); + assertThat(state.getRawKeyedState().isEmpty(), equalTo(false)); + } + } + + private KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> getTestHarness() + throws Exception { + return new KeyedOneInputStreamOperatorTestHarness<>( + new KeyedProcessOperator<>( + new KeyedProcessFunction<Integer, Integer, Integer>() { + @Override + public void processElement( + Integer value, Context ctx, Collector<Integer> out) { + ctx.timerService().registerEventTimeTimer(0L); + } + }), + (KeySelector<Integer, Integer>) value -> value, + BasicTypeInfo.INT_TYPE_INFO, + 1, + 1, + 0); + } +} diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java index db7dd71..7702f55 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java @@ -45,6 +45,15 @@ public final class RocksDBTestUtils { public static <K> RocksDBKeyedStateBackendBuilder<K> builderForTestDefaults( File instanceBasePath, TypeSerializer<K> keySerializer) { + return builderForTestDefaults( + instanceBasePath, keySerializer, RocksDBStateBackend.PriorityQueueStateType.HEAP); + } + + public static <K> RocksDBKeyedStateBackendBuilder<K> builderForTestDefaults( + File instanceBasePath, + TypeSerializer<K> keySerializer, + RocksDBStateBackend.PriorityQueueStateType queueStateType) { + final RocksDBResourceContainer optionsContainer = new RocksDBResourceContainer(); return new RocksDBKeyedStateBackendBuilder<>( @@ -59,7 +68,7 @@ public final class RocksDBTestUtils { new KeyGroupRange(0, 1), new ExecutionConfig(), TestLocalRecoveryConfig.disabled(), - RocksDBStateBackend.PriorityQueueStateType.HEAP, + queueStateType, TtlTimeProvider.DEFAULT, new UnregisteredMetricsGroup(), Collections.emptyList(), diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java index 6f89b02..e374fdb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java @@ -56,8 +56,7 @@ public interface InternalTimeServiceManager<K> { void advanceWatermark(Watermark watermark) throws Exception; /** - * Snapshots the timers to raw keyed state. This should only be called iff {@link - * #isUsingLegacyRawKeyedStateSnapshots()} returns {@code true}. + * Snapshots the timers to raw keyed state. * * <p><b>TODO:</b> This can be removed once heap-based timers are integrated with RocksDB * incremental snapshots. @@ -67,15 +66,6 @@ public interface InternalTimeServiceManager<K> { throws Exception; /** - * Flag indicating whether or not the internal timer services should be checkpointed with legacy - * synchronous snapshots. - * - * <p><b>TODO:</b> This can be removed once heap-based timers are integrated with RocksDB - * incremental snapshots. - */ - boolean isUsingLegacyRawKeyedStateSnapshots(); - - /** * A provider pattern for creating an instance of a {@link InternalTimeServiceManager}. Allows * substituting the manager that will be used at the runtime. */ diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java index 14e9d4f..9288e94 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java @@ -23,7 +23,6 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; @@ -45,7 +44,6 @@ import java.util.HashMap; import java.util.Map; import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.flink.util.Preconditions.checkState; /** * An entity keeping all the time-related services. Right now, this is only a {@link @@ -75,20 +73,16 @@ public class InternalTimeServiceManagerImpl<K> implements InternalTimeServiceMan private final Map<String, InternalTimerServiceImpl<K, ?>> timerServices; - private final boolean useLegacySynchronousSnapshots; - private InternalTimeServiceManagerImpl( KeyGroupRange localKeyGroupRange, KeyContext keyContext, PriorityQueueSetFactory priorityQueueSetFactory, - ProcessingTimeService processingTimeService, - boolean useLegacySynchronousSnapshots) { + ProcessingTimeService processingTimeService) { this.localKeyGroupRange = Preconditions.checkNotNull(localKeyGroupRange); this.priorityQueueSetFactory = Preconditions.checkNotNull(priorityQueueSetFactory); this.keyContext = Preconditions.checkNotNull(keyContext); this.processingTimeService = Preconditions.checkNotNull(processingTimeService); - this.useLegacySynchronousSnapshots = useLegacySynchronousSnapshots; this.timerServices = new HashMap<>(); } @@ -106,18 +100,10 @@ public class InternalTimeServiceManagerImpl<K> implements InternalTimeServiceMan Iterable<KeyGroupStatePartitionStreamProvider> rawKeyedStates) throws Exception { final KeyGroupRange keyGroupRange = keyedStatedBackend.getKeyGroupRange(); - final boolean requiresSnapshotLegacyTimers = - keyedStatedBackend instanceof AbstractKeyedStateBackend - && ((AbstractKeyedStateBackend<K>) keyedStatedBackend) - .requiresLegacySynchronousTimerSnapshots(); final InternalTimeServiceManagerImpl<K> timeServiceManager = new InternalTimeServiceManagerImpl<>( - keyGroupRange, - keyContext, - keyedStatedBackend, - processingTimeService, - requiresSnapshotLegacyTimers); + keyGroupRange, keyContext, keyedStatedBackend, processingTimeService); // and then initialize the timer services for (KeyGroupStatePartitionStreamProvider streamProvider : rawKeyedStates) { @@ -198,15 +184,8 @@ public class InternalTimeServiceManagerImpl<K> implements InternalTimeServiceMan ////////////////// Fault Tolerance Methods /////////////////// @Override - public boolean isUsingLegacyRawKeyedStateSnapshots() { - return useLegacySynchronousSnapshots; - } - - @Override public void snapshotToRawKeyedState(KeyedStateCheckpointOutputStream out, String operatorName) throws Exception { - checkState(useLegacySynchronousSnapshots); - try { KeyGroupsList allKeyGroups = out.getKeyGroupList(); for (int keyGroupIdx : allKeyGroups) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java index b784382..f45b00c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java @@ -30,6 +30,7 @@ import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend; import org.apache.flink.runtime.state.DefaultKeyedStateStore; @@ -194,7 +195,13 @@ public class StreamOperatorStateHandler { "keyedStateBackend should be available with timeServiceManager"); final InternalTimeServiceManager<?> manager = timeServiceManager.get(); - if (manager.isUsingLegacyRawKeyedStateSnapshots()) { + boolean requiresLegacyRawKeyedStateSnapshots = + keyedStateBackend instanceof AbstractKeyedStateBackend + && ((AbstractKeyedStateBackend<?>) keyedStateBackend) + .requiresLegacySynchronousTimerSnapshots( + checkpointOptions.getCheckpointType()); + + if (requiresLegacyRawKeyedStateSnapshots) { checkState( !isUsingCustomRawKeyedState, "Attempting to snapshot timers to raw keyed state, but this operator has custom raw keyed state to write."); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceManager.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceManager.java index 6338fd4..8666f05 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceManager.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceManager.java @@ -83,11 +83,6 @@ public class BatchExecutionInternalTimeServiceManager<K> throw new UnsupportedOperationException("Checkpoints are not supported in BATCH execution"); } - @Override - public boolean isUsingLegacyRawKeyedStateSnapshots() { - throw new UnsupportedOperationException("Checkpoints are not supported in BATCH execution"); - } - public static <K> InternalTimeServiceManager<K> create( CheckpointableKeyedStateBackend<K> keyedStatedBackend, ClassLoader userClassloader, diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java index d82ed51..be0e6b3 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java @@ -26,6 +26,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.memory.ManagedMemoryUseCase; import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointType; import org.apache.flink.runtime.checkpoint.OperatorStateRepartitioner; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.checkpoint.RoundRobinOperatorStateRepartitioner; @@ -654,14 +655,25 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable { */ public OperatorSnapshotFinalizer snapshotWithLocalState(long checkpointId, long timestamp) throws Exception { + return snapshotWithLocalState(checkpointId, timestamp, CheckpointType.CHECKPOINT); + } + + /** + * Calls {@link StreamOperator#snapshotState(long, long, CheckpointOptions, + * org.apache.flink.runtime.state.CheckpointStreamFactory)}. + */ + public OperatorSnapshotFinalizer snapshotWithLocalState( + long checkpointId, long timestamp, CheckpointType checkpointType) throws Exception { + CheckpointStorageLocationReference locationReference = + CheckpointStorageLocationReference.getDefault(); OperatorSnapshotFutures operatorStateResult = operator.snapshotState( checkpointId, timestamp, - CheckpointOptions.forCheckpointWithDefaultLocation(), + new CheckpointOptions(checkpointType, locationReference), checkpointStorageAccess.resolveCheckpointStorageLocation( - checkpointId, CheckpointStorageLocationReference.getDefault())); + checkpointId, locationReference)); return new OperatorSnapshotFinalizer(operatorStateResult); } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StateConfigUtil.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StateConfigUtil.java index 8be9d55..79e9a63 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StateConfigUtil.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StateConfigUtil.java @@ -20,6 +20,7 @@ package org.apache.flink.table.runtime.util; import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.checkpoint.CheckpointType; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.KeyedStateBackend; @@ -56,7 +57,7 @@ public class StateConfigUtil { // indicates the underlying uses heap-bsased timer isHeapTimer = ((AbstractKeyedStateBackend<?>) stateBackend) - .requiresLegacySynchronousTimerSnapshots(); + .requiresLegacySynchronousTimerSnapshots(CheckpointType.CHECKPOINT); } return isRocksDbState && !isHeapTimer; }