This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6675979838ade22cd6e069950bee362ba52ef747 Author: Tzu-Li (Gordon) Tai <[email protected]> AuthorDate: Fri Oct 23 13:48:24 2020 +0800 [FLINK-19741] Let timer service skip reading raw keyed state if it isn't the writer This closes #13762. --- docs/ops/state/state_backends.md | 2 + .../state/api/input/KeyedStateInputFormat.java | 3 +- .../api/operators/AbstractStreamOperator.java | 27 +++- .../api/operators/AbstractStreamOperatorV2.java | 27 +++- .../api/operators/StreamTaskStateInitializer.java | 5 +- .../operators/StreamTaskStateInitializerImpl.java | 20 ++- .../api/operators/AbstractStreamOperatorTest.java | 142 +++++++++++++++++++++ .../StateInitializationContextImplTest.java | 3 +- .../operators/StreamOperatorStateHandlerTest.java | 3 +- .../StreamTaskStateInitializerImplTest.java | 6 +- .../streaming/runtime/tasks/StreamTaskTest.java | 5 +- 11 files changed, 231 insertions(+), 12 deletions(-) diff --git a/docs/ops/state/state_backends.md b/docs/ops/state/state_backends.md index 6b0ae0e..8dff5d0 100644 --- a/docs/ops/state/state_backends.md +++ b/docs/ops/state/state_backends.md @@ -257,6 +257,8 @@ Set the configuration option `state.backend.rocksdb.timer-service.factory` to `h <span class="label label-info">Note</span> *The combination RocksDB state backend with heap-based timers currently does NOT support asynchronous snapshots for the timers state. Other state like keyed state is still snapshotted asynchronously.* +<span class="label label-info">Note</span> *When using RocksDB state backend with heap-based timers, checkpointing and taking savepoints is expected to fail if there are operators in application that write to raw keyed state.* + ### Enabling RocksDB Native Metrics You can optionally access RockDB's native metrics through Flink's metrics system, by enabling certain metrics selectively. diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/KeyedStateInputFormat.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/KeyedStateInputFormat.java index 5fd91d8..100d58d 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/KeyedStateInputFormat.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/KeyedStateInputFormat.java @@ -182,7 +182,8 @@ public class KeyedStateInputFormat<K, N, OUT> extends RichInputFormat<OUT, KeyGr operator, operator.getKeyType().createSerializer(environment.getExecutionConfig()), registry, - getRuntimeContext().getMetricGroup()); + getRuntimeContext().getMetricGroup(), + false); } catch (Exception e) { throw new IOException("Failed to restore state backend", e); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index a249d5e..50a6428 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.api.operators; +import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; @@ -251,7 +252,8 @@ public abstract class AbstractStreamOperator<OUT> this, keySerializer, streamTaskCloseableRegistry, - metrics); + metrics, + isUsingCustomRawKeyedState()); stateHandler = new StreamOperatorStateHandler(context, getExecutionConfig(), streamTaskCloseableRegistry); timeServiceManager = context.internalTimerServiceManager(); @@ -260,6 +262,29 @@ public abstract class AbstractStreamOperator<OUT> } /** + * Indicates whether or not implementations of this class is writing to the raw keyed state streams + * on snapshots, using {@link #snapshotState(StateSnapshotContext)}. If yes, subclasses should + * override this method to return {@code true}. + * + * <p>Subclasses need to explicitly indicate the use of raw keyed state because, internally, + * the {@link AbstractStreamOperator} may attempt to read from it as well to restore heap-based timers and + * ultimately fail with read errors. By setting this flag to {@code true}, this allows the + * {@link AbstractStreamOperator} to know that the data written in the raw keyed states were + * not written by the timer services, and skips the timer restore attempt. + * + * <p>Please refer to FLINK-19741 for further details. + * + * <p>TODO: this method can be removed once all timers are moved to be managed by state backends. + * + * @return flag indicating whether or not this operator is writing to raw keyed + * state via {@link #snapshotState(StateSnapshotContext)}. + */ + @Internal + protected boolean isUsingCustomRawKeyedState() { + return false; + } + + /** * This method is called immediately before any elements are processed, it should contain the * operator's initialization logic, e.g. state initialization. * diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java index 819b254..31eb8ad 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.annotation.Experimental; +import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.KeyedStateStore; @@ -201,7 +202,8 @@ public abstract class AbstractStreamOperatorV2<OUT> implements StreamOperator<OU this, keySerializer, cancelables, - metrics); + metrics, + isUsingCustomRawKeyedState()); stateHandler = new StreamOperatorStateHandler(context, getExecutionConfig(), cancelables); timeServiceManager = context.internalTimerServiceManager(); @@ -209,6 +211,29 @@ public abstract class AbstractStreamOperatorV2<OUT> implements StreamOperator<OU } /** + * Indicates whether or not implementations of this class is writing to the raw keyed state streams + * on snapshots, using {@link #snapshotState(StateSnapshotContext)}. If yes, subclasses should + * override this method to return {@code true}. + * + * <p>Subclasses need to explicitly indicate the use of raw keyed state because, internally, + * the {@link AbstractStreamOperator} may attempt to read from it as well to restore heap-based timers and + * ultimately fail with read errors. By setting this flag to {@code true}, this allows the + * {@link AbstractStreamOperator} to know that the data written in the raw keyed states were + * not written by the timer services, and skips the timer restore attempt. + * + * <p>Please refer to FLINK-19741 for further details. + * + * <p>TODO: this method can be removed once all timers are moved to be managed by state backends. + * + * @return flag indicating whether or not this operator is writing to raw keyed + * state via {@link #snapshotState(StateSnapshotContext)}. + */ + @Internal + protected boolean isUsingCustomRawKeyedState() { + return false; + } + + /** * This method is called immediately before any elements are processed, it should contain the * operator's initialization logic, e.g. state initialization. * diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializer.java index 35ded9e..712d6ee 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializer.java @@ -45,6 +45,8 @@ public interface StreamTaskStateInitializer { * @param keySerializer the key-serializer for the operator. Can be null. * @param streamTaskCloseableRegistry the closeable registry to which created closeable objects will be registered. * @param metricGroup the parent metric group for all statebackend metrics + * @param isUsingCustomRawKeyedState flag indicating whether or not the {@link AbstractStreamOperator} is writing + * custom raw keyed state. * @return a context from which the given operator can initialize everything related to state. * @throws Exception when something went wrong while creating the context. */ @@ -55,5 +57,6 @@ public interface StreamTaskStateInitializer { @Nonnull KeyContext keyContext, @Nullable TypeSerializer<?> keySerializer, @Nonnull CloseableRegistry streamTaskCloseableRegistry, - @Nonnull MetricGroup metricGroup) throws Exception; + @Nonnull MetricGroup metricGroup, + boolean isUsingCustomRawKeyedState) throws Exception; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java index ca3ada5..40fb3a3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java @@ -58,6 +58,7 @@ import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; @@ -117,7 +118,8 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize @Nonnull KeyContext keyContext, @Nullable TypeSerializer<?> keySerializer, @Nonnull CloseableRegistry streamTaskCloseableRegistry, - @Nonnull MetricGroup metricGroup) throws Exception { + @Nonnull MetricGroup metricGroup, + boolean isUsingCustomRawKeyedState) throws Exception { TaskInfo taskInfo = environment.getTaskInfo(); OperatorSubtaskDescriptionText operatorSubtaskDescription = @@ -164,7 +166,21 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs); // -------------- Internal Timer Service Manager -------------- - timeServiceManager = internalTimeServiceManager(keyedStatedBackend, keyContext, processingTimeService, rawKeyedStateInputs); + + // if the operator indicates that it is using custom raw keyed state, + // then whatever was written in the raw keyed state snapshot was NOT written + // by the internal timer services (because there is only ever one user of raw keyed state); + // in this case, we should not attempt to restore timers from the raw keyed state. + final Iterable<KeyGroupStatePartitionStreamProvider> restoredRawKeyedStateTimers = + (prioritizedOperatorSubtaskStates.isRestored() && !isUsingCustomRawKeyedState) + ? rawKeyedStateInputs + : Collections.emptyList(); + + timeServiceManager = internalTimeServiceManager( + keyedStatedBackend, + keyContext, + processingTimeService, + restoredRawKeyedStateTimers); // -------------- Preparing return value -------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java index dc30675..6b035df 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java @@ -20,12 +20,17 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; +import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -33,10 +38,16 @@ import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeMatcher; import org.junit.Test; import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Random; import static junit.framework.TestCase.assertTrue; @@ -67,6 +78,22 @@ public class AbstractStreamOperatorTest { subtaskIndex); } + protected <K, IN, OUT> KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> createTestHarness( + int maxParalelism, + int numSubtasks, + int subtaskIndex, + OneInputStreamOperator<IN, OUT> testOperator, + KeySelector<IN, K> keySelector, + TypeInformation<K> keyTypeInfo) throws Exception { + return new KeyedOneInputStreamOperatorTestHarness<>( + testOperator, + keySelector, + keyTypeInfo, + maxParalelism, + numSubtasks, + subtaskIndex); + } + @Test public void testStateDoesNotInterfere() throws Exception { try (KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness = createTestHarness()) { @@ -410,6 +437,46 @@ public class AbstractStreamOperatorTest { } } + @Test + public void testCustomRawKeyedStateSnapshotAndRestore() throws Exception { + // setup: 10 key groups, all assigned to single subtask + final int maxParallelism = 10; + final int numSubtasks = 1; + final int subtaskIndex = 0; + final KeyGroupRange keyGroupRange = KeyGroupRange.of(0, maxParallelism - 1); + + final byte[] testSnapshotData = "TEST".getBytes(); + final CustomRawKeyedStateTestOperator testOperator = new CustomRawKeyedStateTestOperator(testSnapshotData); + + // snapshot and then restore + OperatorSubtaskState snapshot; + try (KeyedOneInputStreamOperatorTestHarness<String, String, String> testHarness = createTestHarness( + maxParallelism, + numSubtasks, + subtaskIndex, + testOperator, + input -> input, + BasicTypeInfo.STRING_TYPE_INFO)) { + testHarness.setup(); + testHarness.open(); + snapshot = testHarness.snapshot(0, 0); + } + + try (KeyedOneInputStreamOperatorTestHarness<String, String, String> testHarness = createTestHarness( + maxParallelism, + numSubtasks, + subtaskIndex, + testOperator, + input -> input, + BasicTypeInfo.STRING_TYPE_INFO)) { + testHarness.setup(); + testHarness.initializeState(snapshot); + testHarness.open(); + } + + assertThat(testOperator.restoredRawKeyedState, hasRestoredKeyGroupsWith(testSnapshotData, keyGroupRange)); + } + /** * Extracts the result values form the test harness and clear the output queue. */ @@ -501,6 +568,57 @@ public class AbstractStreamOperatorTest { } } + /** + * Operator that writes arbitrary bytes to raw keyed state on snapshots. + */ + private static class CustomRawKeyedStateTestOperator + extends AbstractStreamOperator<String> + implements OneInputStreamOperator<String, String> { + + private static final long serialVersionUID = 1L; + + private final byte[] snapshotBytes; + + private Map<Integer, byte[]> restoredRawKeyedState; + + CustomRawKeyedStateTestOperator(byte[] snapshotBytes) { + this.snapshotBytes = Arrays.copyOf(snapshotBytes, snapshotBytes.length); + } + + @Override + public void processElement(StreamRecord<String> element) throws Exception { + // do nothing + } + + @Override + protected boolean isUsingCustomRawKeyedState() { + return true; + } + + @Override + public void snapshotState(StateSnapshotContext context) throws Exception { + super.snapshotState(context); + KeyedStateCheckpointOutputStream rawKeyedStateStream = context.getRawKeyedOperatorStateOutput(); + for (int keyGroupId : rawKeyedStateStream.getKeyGroupList()) { + rawKeyedStateStream.startNewKeyGroup(keyGroupId); + rawKeyedStateStream.write(snapshotBytes); + } + rawKeyedStateStream.close(); + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { + super.initializeState(context); + + restoredRawKeyedState = new HashMap<>(); + for (KeyGroupStatePartitionStreamProvider streamProvider : context.getRawKeyedStateInputs()) { + byte[] readBuffer = new byte[snapshotBytes.length]; + int ignored = streamProvider.getStream().read(readBuffer); + restoredRawKeyedState.put(streamProvider.getKeyGroupId(), readBuffer); + } + } + } + private static int getKeyInKeyGroupRange(KeyGroupRange range, int maxParallelism) { Random rand = new Random(System.currentTimeMillis()); int result = rand.nextInt(); @@ -509,4 +627,28 @@ public class AbstractStreamOperatorTest { } return result; } + + private static Matcher<Map<Integer, byte[]>> hasRestoredKeyGroupsWith(byte[] testSnapshotData, KeyGroupRange range) { + return new TypeSafeMatcher<Map<Integer, byte[]>>() { + @Override + protected boolean matchesSafely(Map<Integer, byte[]> restored) { + if (restored.size() != range.getNumberOfKeyGroups()) { + return false; + } + + for (int writtenKeyGroupId : range) { + if (!Arrays.equals(restored.get(writtenKeyGroupId), testSnapshotData)) { + return false; + } + } + + return true; + } + + @Override + public void describeTo(Description description) { + description.appendText("Key groups: " + range + " with snapshot data " + Arrays.toString(testSnapshotData)); + } + }; + } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java index a8c8da9..0d171f0 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java @@ -199,7 +199,8 @@ public class StateInitializationContextImplTest { // consumed by the timer service. IntSerializer.INSTANCE, closableRegistry, - new UnregisteredMetricsGroup()); + new UnregisteredMetricsGroup(), + false); this.initializationContext = new StateInitializationContextImpl( diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandlerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandlerTest.java index b1a2fd5..62f3b42 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandlerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandlerTest.java @@ -103,7 +103,8 @@ public class StreamOperatorStateHandlerTest { new UnUsedKeyContext(), IntSerializer.INSTANCE, closeableRegistry, - new InterceptingOperatorMetricGroup()); + new InterceptingOperatorMetricGroup(), + false); StreamOperatorStateHandler stateHandler = new StreamOperatorStateHandler(stateContext, new ExecutionConfig(), closeableRegistry); final String keyedStateField = "keyedStateField"; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java index 1f32901..5677733 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java @@ -100,7 +100,8 @@ public class StreamTaskStateInitializerImplTest { streamOperator, typeSerializer, closeableRegistry, - new UnregisteredMetricsGroup()); + new UnregisteredMetricsGroup(), + false); OperatorStateBackend operatorStateBackend = stateContext.operatorStateBackend(); AbstractKeyedStateBackend<?> keyedStateBackend = stateContext.keyedStateBackend(); @@ -211,7 +212,8 @@ public class StreamTaskStateInitializerImplTest { streamOperator, typeSerializer, closeableRegistry, - new UnregisteredMetricsGroup()); + new UnregisteredMetricsGroup(), + false); OperatorStateBackend operatorStateBackend = stateContext.operatorStateBackend(); AbstractKeyedStateBackend<?> keyedStateBackend = stateContext.keyedStateBackend(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 09af3af..7d4d9bc 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -1601,7 +1601,7 @@ public class StreamTaskTest extends TestLogger { @Override public StreamTaskStateInitializer createStreamTaskStateInitializer() { final StreamTaskStateInitializer streamTaskStateManager = super.createStreamTaskStateInitializer(); - return (operatorID, operatorClassName, processingTimeService, keyContext, keySerializer, closeableRegistry, metricGroup) -> { + return (operatorID, operatorClassName, processingTimeService, keyContext, keySerializer, closeableRegistry, metricGroup, isUsingCustomRawKeyedState) -> { final StreamOperatorStateContext controller = streamTaskStateManager.streamOperatorStateContext( operatorID, @@ -1610,7 +1610,8 @@ public class StreamTaskTest extends TestLogger { keyContext, keySerializer, closeableRegistry, - metricGroup); + metricGroup, + isUsingCustomRawKeyedState); return new StreamOperatorStateContext() { @Override
