This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6675979838ade22cd6e069950bee362ba52ef747
Author: Tzu-Li (Gordon) Tai <[email protected]>
AuthorDate: Fri Oct 23 13:48:24 2020 +0800

    [FLINK-19741] Let timer service skip reading raw keyed state if it isn't 
the writer
    
    This closes #13762.
---
 docs/ops/state/state_backends.md                   |   2 +
 .../state/api/input/KeyedStateInputFormat.java     |   3 +-
 .../api/operators/AbstractStreamOperator.java      |  27 +++-
 .../api/operators/AbstractStreamOperatorV2.java    |  27 +++-
 .../api/operators/StreamTaskStateInitializer.java  |   5 +-
 .../operators/StreamTaskStateInitializerImpl.java  |  20 ++-
 .../api/operators/AbstractStreamOperatorTest.java  | 142 +++++++++++++++++++++
 .../StateInitializationContextImplTest.java        |   3 +-
 .../operators/StreamOperatorStateHandlerTest.java  |   3 +-
 .../StreamTaskStateInitializerImplTest.java        |   6 +-
 .../streaming/runtime/tasks/StreamTaskTest.java    |   5 +-
 11 files changed, 231 insertions(+), 12 deletions(-)

diff --git a/docs/ops/state/state_backends.md b/docs/ops/state/state_backends.md
index 6b0ae0e..8dff5d0 100644
--- a/docs/ops/state/state_backends.md
+++ b/docs/ops/state/state_backends.md
@@ -257,6 +257,8 @@ Set the configuration option 
`state.backend.rocksdb.timer-service.factory` to `h
 
 <span class="label label-info">Note</span> *The combination RocksDB state 
backend with heap-based timers currently does NOT support asynchronous 
snapshots for the timers state. Other state like keyed state is still 
snapshotted asynchronously.*
 
+<span class="label label-info">Note</span> *When using RocksDB state backend 
with heap-based timers, checkpointing and taking savepoints is expected to fail 
if there are operators in application that write to raw keyed state.*
+
 ### Enabling RocksDB Native Metrics
 
 You can optionally access RockDB's native metrics through Flink's metrics 
system, by enabling certain metrics selectively.
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/KeyedStateInputFormat.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/KeyedStateInputFormat.java
index 5fd91d8..100d58d 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/KeyedStateInputFormat.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/KeyedStateInputFormat.java
@@ -182,7 +182,8 @@ public class KeyedStateInputFormat<K, N, OUT> extends 
RichInputFormat<OUT, KeyGr
                                operator,
                                
operator.getKeyType().createSerializer(environment.getExecutionConfig()),
                                registry,
-                               getRuntimeContext().getMetricGroup());
+                               getRuntimeContext().getMetricGroup(),
+                               false);
                } catch (Exception e) {
                        throw new IOException("Failed to restore state 
backend", e);
                }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index a249d5e..50a6428 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.api.operators;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
@@ -251,7 +252,8 @@ public abstract class AbstractStreamOperator<OUT>
                                this,
                                keySerializer,
                                streamTaskCloseableRegistry,
-                               metrics);
+                               metrics,
+                               isUsingCustomRawKeyedState());
 
                stateHandler = new StreamOperatorStateHandler(context, 
getExecutionConfig(), streamTaskCloseableRegistry);
                timeServiceManager = context.internalTimerServiceManager();
@@ -260,6 +262,29 @@ public abstract class AbstractStreamOperator<OUT>
        }
 
        /**
+        * Indicates whether or not implementations of this class is writing to 
the raw keyed state streams
+        * on snapshots, using {@link #snapshotState(StateSnapshotContext)}. If 
yes, subclasses should
+        * override this method to return {@code true}.
+        *
+        * <p>Subclasses need to explicitly indicate the use of raw keyed state 
because, internally,
+        * the {@link AbstractStreamOperator} may attempt to read from it as 
well to restore heap-based timers and
+        * ultimately fail with read errors. By setting this flag to {@code 
true}, this allows the
+        * {@link AbstractStreamOperator} to know that the data written in the 
raw keyed states were
+        * not written by the timer services, and skips the timer restore 
attempt.
+        *
+        * <p>Please refer to FLINK-19741 for further details.
+        *
+        * <p>TODO: this method can be removed once all timers are moved to be 
managed by state backends.
+        *
+        * @return flag indicating whether or not this operator is writing to 
raw keyed
+        *         state via {@link #snapshotState(StateSnapshotContext)}.
+        */
+       @Internal
+       protected boolean isUsingCustomRawKeyedState() {
+               return false;
+       }
+
+       /**
         * This method is called immediately before any elements are processed, 
it should contain the
         * operator's initialization logic, e.g. state initialization.
         *
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
index 819b254..31eb8ad 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.KeyedStateStore;
@@ -201,7 +202,8 @@ public abstract class AbstractStreamOperatorV2<OUT> 
implements StreamOperator<OU
                                this,
                                keySerializer,
                                cancelables,
-                               metrics);
+                               metrics,
+                               isUsingCustomRawKeyedState());
 
                stateHandler = new StreamOperatorStateHandler(context, 
getExecutionConfig(), cancelables);
                timeServiceManager = context.internalTimerServiceManager();
@@ -209,6 +211,29 @@ public abstract class AbstractStreamOperatorV2<OUT> 
implements StreamOperator<OU
        }
 
        /**
+        * Indicates whether or not implementations of this class is writing to 
the raw keyed state streams
+        * on snapshots, using {@link #snapshotState(StateSnapshotContext)}. If 
yes, subclasses should
+        * override this method to return {@code true}.
+        *
+        * <p>Subclasses need to explicitly indicate the use of raw keyed state 
because, internally,
+        * the {@link AbstractStreamOperator} may attempt to read from it as 
well to restore heap-based timers and
+        * ultimately fail with read errors. By setting this flag to {@code 
true}, this allows the
+        * {@link AbstractStreamOperator} to know that the data written in the 
raw keyed states were
+        * not written by the timer services, and skips the timer restore 
attempt.
+        *
+        * <p>Please refer to FLINK-19741 for further details.
+        *
+        * <p>TODO: this method can be removed once all timers are moved to be 
managed by state backends.
+        *
+        * @return flag indicating whether or not this operator is writing to 
raw keyed
+        *         state via {@link #snapshotState(StateSnapshotContext)}.
+        */
+       @Internal
+       protected boolean isUsingCustomRawKeyedState() {
+               return false;
+       }
+
+       /**
         * This method is called immediately before any elements are processed, 
it should contain the
         * operator's initialization logic, e.g. state initialization.
         *
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializer.java
index 35ded9e..712d6ee 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializer.java
@@ -45,6 +45,8 @@ public interface StreamTaskStateInitializer {
         * @param keySerializer the key-serializer for the operator. Can be 
null.
         * @param streamTaskCloseableRegistry the closeable registry to which 
created closeable objects will be registered.
         * @param metricGroup the parent metric group for all statebackend 
metrics
+        * @param isUsingCustomRawKeyedState flag indicating whether or not the 
{@link AbstractStreamOperator} is writing
+        *                                   custom raw keyed state.
         * @return a context from which the given operator can initialize 
everything related to state.
         * @throws Exception when something went wrong while creating the 
context.
         */
@@ -55,5 +57,6 @@ public interface StreamTaskStateInitializer {
                @Nonnull KeyContext keyContext,
                @Nullable TypeSerializer<?> keySerializer,
                @Nonnull CloseableRegistry streamTaskCloseableRegistry,
-               @Nonnull MetricGroup metricGroup) throws Exception;
+               @Nonnull MetricGroup metricGroup,
+               boolean isUsingCustomRawKeyedState) throws Exception;
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
index ca3ada5..40fb3a3 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
@@ -58,6 +58,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
@@ -117,7 +118,8 @@ public class StreamTaskStateInitializerImpl implements 
StreamTaskStateInitialize
                @Nonnull KeyContext keyContext,
                @Nullable TypeSerializer<?> keySerializer,
                @Nonnull CloseableRegistry streamTaskCloseableRegistry,
-               @Nonnull MetricGroup metricGroup) throws Exception {
+               @Nonnull MetricGroup metricGroup,
+               boolean isUsingCustomRawKeyedState) throws Exception {
 
                TaskInfo taskInfo = environment.getTaskInfo();
                OperatorSubtaskDescriptionText operatorSubtaskDescription =
@@ -164,7 +166,21 @@ public class StreamTaskStateInitializerImpl implements 
StreamTaskStateInitialize
                        
streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs);
 
                        // -------------- Internal Timer Service Manager 
--------------
-                       timeServiceManager = 
internalTimeServiceManager(keyedStatedBackend, keyContext, 
processingTimeService, rawKeyedStateInputs);
+
+                       // if the operator indicates that it is using custom 
raw keyed state,
+                       // then whatever was written in the raw keyed state 
snapshot was NOT written
+                       // by the internal timer services (because there is 
only ever one user of raw keyed state);
+                       // in this case, we should not attempt to restore 
timers from the raw keyed state.
+                       final Iterable<KeyGroupStatePartitionStreamProvider> 
restoredRawKeyedStateTimers =
+                               (prioritizedOperatorSubtaskStates.isRestored() 
&& !isUsingCustomRawKeyedState)
+                                       ? rawKeyedStateInputs
+                                       : Collections.emptyList();
+
+                       timeServiceManager = internalTimeServiceManager(
+                               keyedStatedBackend,
+                               keyContext,
+                               processingTimeService,
+                               restoredRawKeyedStateTimers);
 
                        // -------------- Preparing return value --------------
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
index dc30675..6b035df 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -20,12 +20,17 @@ package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
+import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -33,10 +38,16 @@ import 
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 
 import static junit.framework.TestCase.assertTrue;
@@ -67,6 +78,22 @@ public class AbstractStreamOperatorTest {
                        subtaskIndex);
        }
 
+       protected <K, IN, OUT> KeyedOneInputStreamOperatorTestHarness<K, IN, 
OUT> createTestHarness(
+                       int maxParalelism,
+                       int numSubtasks,
+                       int subtaskIndex,
+                       OneInputStreamOperator<IN, OUT> testOperator,
+                       KeySelector<IN, K> keySelector,
+                       TypeInformation<K> keyTypeInfo) throws Exception {
+               return new KeyedOneInputStreamOperatorTestHarness<>(
+                       testOperator,
+                       keySelector,
+                       keyTypeInfo,
+                       maxParalelism,
+                       numSubtasks,
+                       subtaskIndex);
+       }
+
        @Test
        public void testStateDoesNotInterfere() throws Exception {
                try (KeyedOneInputStreamOperatorTestHarness<Integer, 
Tuple2<Integer, String>, String> testHarness = createTestHarness()) {
@@ -410,6 +437,46 @@ public class AbstractStreamOperatorTest {
                }
        }
 
+       @Test
+       public void testCustomRawKeyedStateSnapshotAndRestore() throws 
Exception {
+               // setup: 10 key groups, all assigned to single subtask
+               final int maxParallelism = 10;
+               final int numSubtasks = 1;
+               final int subtaskIndex = 0;
+               final KeyGroupRange keyGroupRange = KeyGroupRange.of(0, 
maxParallelism - 1);
+
+               final byte[] testSnapshotData = "TEST".getBytes();
+               final CustomRawKeyedStateTestOperator testOperator = new 
CustomRawKeyedStateTestOperator(testSnapshotData);
+
+               // snapshot and then restore
+               OperatorSubtaskState snapshot;
+               try (KeyedOneInputStreamOperatorTestHarness<String, String, 
String> testHarness = createTestHarness(
+                               maxParallelism,
+                               numSubtasks,
+                               subtaskIndex,
+                               testOperator,
+                               input -> input,
+                               BasicTypeInfo.STRING_TYPE_INFO)) {
+                       testHarness.setup();
+                       testHarness.open();
+                       snapshot = testHarness.snapshot(0, 0);
+               }
+
+               try (KeyedOneInputStreamOperatorTestHarness<String, String, 
String> testHarness = createTestHarness(
+                               maxParallelism,
+                               numSubtasks,
+                               subtaskIndex,
+                               testOperator,
+                               input -> input,
+                               BasicTypeInfo.STRING_TYPE_INFO)) {
+                       testHarness.setup();
+                       testHarness.initializeState(snapshot);
+                       testHarness.open();
+               }
+
+               assertThat(testOperator.restoredRawKeyedState, 
hasRestoredKeyGroupsWith(testSnapshotData, keyGroupRange));
+       }
+
        /**
         * Extracts the result values form the test harness and clear the 
output queue.
         */
@@ -501,6 +568,57 @@ public class AbstractStreamOperatorTest {
                }
        }
 
+       /**
+        * Operator that writes arbitrary bytes to raw keyed state on snapshots.
+        */
+       private static class CustomRawKeyedStateTestOperator
+               extends AbstractStreamOperator<String>
+               implements OneInputStreamOperator<String, String> {
+
+               private static final long serialVersionUID = 1L;
+
+               private final byte[] snapshotBytes;
+
+               private Map<Integer, byte[]> restoredRawKeyedState;
+
+               CustomRawKeyedStateTestOperator(byte[] snapshotBytes) {
+                       this.snapshotBytes = Arrays.copyOf(snapshotBytes, 
snapshotBytes.length);
+               }
+
+               @Override
+               public void processElement(StreamRecord<String> element) throws 
Exception {
+                       // do nothing
+               }
+
+               @Override
+               protected boolean isUsingCustomRawKeyedState() {
+                       return true;
+               }
+
+               @Override
+               public void snapshotState(StateSnapshotContext context) throws 
Exception {
+                       super.snapshotState(context);
+                       KeyedStateCheckpointOutputStream rawKeyedStateStream = 
context.getRawKeyedOperatorStateOutput();
+                       for (int keyGroupId : 
rawKeyedStateStream.getKeyGroupList()) {
+                               
rawKeyedStateStream.startNewKeyGroup(keyGroupId);
+                               rawKeyedStateStream.write(snapshotBytes);
+                       }
+                       rawKeyedStateStream.close();
+               }
+
+               @Override
+               public void initializeState(StateInitializationContext context) 
throws Exception {
+                       super.initializeState(context);
+
+                       restoredRawKeyedState = new HashMap<>();
+                       for (KeyGroupStatePartitionStreamProvider 
streamProvider : context.getRawKeyedStateInputs()) {
+                               byte[] readBuffer = new 
byte[snapshotBytes.length];
+                               int ignored = 
streamProvider.getStream().read(readBuffer);
+                               
restoredRawKeyedState.put(streamProvider.getKeyGroupId(), readBuffer);
+                       }
+               }
+       }
+
        private static int getKeyInKeyGroupRange(KeyGroupRange range, int 
maxParallelism) {
                Random rand = new Random(System.currentTimeMillis());
                int result = rand.nextInt();
@@ -509,4 +627,28 @@ public class AbstractStreamOperatorTest {
                }
                return result;
        }
+
+       private static Matcher<Map<Integer, byte[]>> 
hasRestoredKeyGroupsWith(byte[] testSnapshotData, KeyGroupRange range) {
+               return new TypeSafeMatcher<Map<Integer, byte[]>>() {
+                       @Override
+                       protected boolean matchesSafely(Map<Integer, byte[]> 
restored) {
+                               if (restored.size() != 
range.getNumberOfKeyGroups()) {
+                                       return false;
+                               }
+
+                               for (int writtenKeyGroupId : range) {
+                                       if 
(!Arrays.equals(restored.get(writtenKeyGroupId), testSnapshotData)) {
+                                               return false;
+                                       }
+                               }
+
+                               return true;
+                       }
+
+                       @Override
+                       public void describeTo(Description description) {
+                               description.appendText("Key groups: " + range + 
" with snapshot data " + Arrays.toString(testSnapshotData));
+                       }
+               };
+       }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
index a8c8da9..0d171f0 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
@@ -199,7 +199,8 @@ public class StateInitializationContextImplTest {
                        // consumed by the timer service.
                        IntSerializer.INSTANCE,
                        closableRegistry,
-                       new UnregisteredMetricsGroup());
+                       new UnregisteredMetricsGroup(),
+                       false);
 
                this.initializationContext =
                                new StateInitializationContextImpl(
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandlerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandlerTest.java
index b1a2fd5..62f3b42 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandlerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandlerTest.java
@@ -103,7 +103,8 @@ public class StreamOperatorStateHandlerTest {
                                new UnUsedKeyContext(),
                                IntSerializer.INSTANCE,
                                closeableRegistry,
-                               new InterceptingOperatorMetricGroup());
+                               new InterceptingOperatorMetricGroup(),
+                               false);
                        StreamOperatorStateHandler stateHandler = new 
StreamOperatorStateHandler(stateContext, new ExecutionConfig(), 
closeableRegistry);
 
                        final String keyedStateField = "keyedStateField";
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
index 1f32901..5677733 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
@@ -100,7 +100,8 @@ public class StreamTaskStateInitializerImplTest {
                        streamOperator,
                        typeSerializer,
                        closeableRegistry,
-                       new UnregisteredMetricsGroup());
+                       new UnregisteredMetricsGroup(),
+                       false);
 
                OperatorStateBackend operatorStateBackend = 
stateContext.operatorStateBackend();
                AbstractKeyedStateBackend<?> keyedStateBackend = 
stateContext.keyedStateBackend();
@@ -211,7 +212,8 @@ public class StreamTaskStateInitializerImplTest {
                        streamOperator,
                        typeSerializer,
                        closeableRegistry,
-                       new UnregisteredMetricsGroup());
+                       new UnregisteredMetricsGroup(),
+                       false);
 
                OperatorStateBackend operatorStateBackend = 
stateContext.operatorStateBackend();
                AbstractKeyedStateBackend<?> keyedStateBackend = 
stateContext.keyedStateBackend();
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 09af3af..7d4d9bc 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -1601,7 +1601,7 @@ public class StreamTaskTest extends TestLogger {
                @Override
                public StreamTaskStateInitializer 
createStreamTaskStateInitializer() {
                        final StreamTaskStateInitializer streamTaskStateManager 
= super.createStreamTaskStateInitializer();
-                       return (operatorID, operatorClassName, 
processingTimeService, keyContext, keySerializer, closeableRegistry, 
metricGroup) -> {
+                       return (operatorID, operatorClassName, 
processingTimeService, keyContext, keySerializer, closeableRegistry, 
metricGroup, isUsingCustomRawKeyedState) -> {
 
                                final StreamOperatorStateContext controller = 
streamTaskStateManager.streamOperatorStateContext(
                                        operatorID,
@@ -1610,7 +1610,8 @@ public class StreamTaskTest extends TestLogger {
                                        keyContext,
                                        keySerializer,
                                        closeableRegistry,
-                                       metricGroup);
+                                       metricGroup,
+                                       isUsingCustomRawKeyedState);
 
                                return new StreamOperatorStateContext() {
                                        @Override

Reply via email to