Repository: flink Updated Branches: refs/heads/master 0182141d4 -> 2ef4900aa
[FLINK-6324] [DataStream] Refine OperatorStateStore interface This commit refines the OperatorStateStore interface by, - deprecating Java serialization shortcuts - rename getOperatorState to getListState The Java serialization shortcuts are deprecated because they were previously introduced to provide a smoother migration path from older savepoints. However, its usage should definitely be discouraged. Renaming to getListState is a preparation of making the names of state access methods contain information about both its redistribution pattern on restore and the shape of its datastructure, since the combination of these two is orthogonal. This convention will also provide a better naming pattern for more state access methods in the future, for example getUnionListState. If the method name does not contain its redistribution pattern (e.g., getListState), then it simply implies the default repartitioning scheme (SPLIT_DISTRIBUTE). Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a1aab642 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a1aab642 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a1aab642 Branch: refs/heads/master Commit: a1aab64231f3177954404a1d160d8932d76b2826 Parents: 0182141 Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org> Authored: Wed Apr 19 02:19:48 2017 +0800 Committer: Tzu-Li (Gordon) Tai <tzuli...@apache.org> Committed: Wed Apr 19 10:32:08 2017 +0800 ---------------------------------------------------------------------- .../kafka/FlinkKafkaConsumerBaseTest.java | 6 +- .../api/common/state/OperatorStateStore.java | 40 +++++- .../ProcTimeUnboundedNonPartitionedOver.scala | 2 +- .../state/DefaultOperatorStateBackend.java | 143 +++++++++++-------- .../runtime/state/OperatorStateBackendTest.java | 20 +-- .../ContinuousFileMonitoringFunction.java | 2 +- .../functions/source/FromElementsFunction.java | 2 +- .../source/StatefulSequenceSource.java | 2 +- .../api/operators/async/AsyncWaitOperator.java | 4 +- .../StreamOperatorSnapshotRestoreTest.java | 11 +- .../WrappingFunctionSnapshotRestoreTest.java | 6 +- .../runtime/tasks/OneInputStreamTaskTest.java | 4 +- .../test/checkpointing/RescalingITCase.java | 7 +- 13 files changed, 158 insertions(+), 91 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a1aab642/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java index 123c2be..4f5b283 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java @@ -109,7 +109,7 @@ public class FlinkKafkaConsumerBaseTest { FlinkKafkaConsumerBase<String> consumer = getConsumer(fetcher, new LinkedMap(), false); OperatorStateStore operatorStateStore = mock(OperatorStateStore.class); TestingListState<Tuple2<KafkaTopicPartition, Long>> listState = new TestingListState<>(); - when(operatorStateStore.getOperatorState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState); + when(operatorStateStore.getListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState); consumer.snapshotState(new StateSnapshotContextSynchronousImpl(1, 1)); @@ -320,7 +320,7 @@ public class FlinkKafkaConsumerBaseTest { OperatorStateStore operatorStateStore = mock(OperatorStateStore.class); listState = new TestingListState<>(); - when(operatorStateStore.getOperatorState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState); + when(operatorStateStore.getListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState); // create 500 snapshots for (int i = 100; i < 600; i++) { @@ -444,7 +444,7 @@ public class FlinkKafkaConsumerBaseTest { OperatorStateStore operatorStateStore = mock(OperatorStateStore.class); listState = new TestingListState<>(); - when(operatorStateStore.getOperatorState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState); + when(operatorStateStore.getListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState); // create 500 snapshots for (int i = 100; i < 600; i++) { http://git-wip-us.apache.org/repos/asf/flink/blob/a1aab642/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java index c1cdfe4..8be04fc 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java @@ -41,6 +41,34 @@ public interface OperatorStateStore { * @return A list for all state partitions. * @throws Exception */ + <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception; + + /** + * Returns a set with the names of all currently registered states. + * @return set of names for all registered states. + */ + Set<String> getRegisteredStateNames(); + + // ------------------------------------------------------------------------------------------- + // Deprecated methods + // ------------------------------------------------------------------------------------------- + + /** + * Creates (or restores) a list state. Each state is registered under a unique name. + * The provided serializer is used to de/serialize the state in case of checkpointing (snapshot/restore). + * + * The items in the list are repartitionable by the system in case of changed operator parallelism. + * + * @param stateDescriptor The descriptor for this state, providing a name and serializer. + * @param <S> The generic type of the state + * + * @return A list for all state partitions. + * @throws Exception + * + * @deprecated since 1.3.0. This was deprecated as part of a refinement to the function names. + * Please use {@link #getListState(ListStateDescriptor)} instead. + */ + @Deprecated <S> ListState<S> getOperatorState(ListStateDescriptor<S> stateDescriptor) throws Exception; /** @@ -48,17 +76,15 @@ public interface OperatorStateStore { * are repartitionable by the system in case of changed operator parallelism. * * <p>This is a simple convenience method. For more flexibility on how state serialization - * should happen, use the {@link #getOperatorState(ListStateDescriptor)} method. + * should happen, use the {@link #getListState(ListStateDescriptor)} method. * * @param stateName The name of state to create * @return A list state using Java serialization to serialize state objects. * @throws Exception + * + * @deprecated since 1.3.0. Using Java serialization for persisting state is not encouraged. + * Please use {@link #getListState(ListStateDescriptor)} instead. */ + @Deprecated <T extends Serializable> ListState<T> getSerializableListState(String stateName) throws Exception; - - /** - * Returns a set with the names of all currently registered states. - * @return set of names for all registered states. - */ - Set<String> getRegisteredStateNames(); } http://git-wip-us.apache.org/repos/asf/flink/blob/a1aab642/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala index 6b9800b..75209db 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala @@ -91,6 +91,6 @@ class ProcTimeUnboundedNonPartitionedOver( override def initializeState(context: FunctionInitializationContext): Unit = { val accumulatorsDescriptor = new ListStateDescriptor[Row]("overState", aggregationStateType) - state = context.getOperatorStateStore.getOperatorState(accumulatorsDescriptor) + state = context.getOperatorStateStore.getListState(accumulatorsDescriptor) } } http://git-wip-us.apache.org/repos/asf/flink/blob/a1aab642/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java index ca7cb48..42d4519 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java @@ -86,15 +86,13 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { registeredStates.clear(); } - @SuppressWarnings("unchecked") - @Override - public <T extends Serializable> ListState<T> getSerializableListState(String stateName) throws Exception { - return (ListState<T>) getOperatorState(new ListStateDescriptor<>(stateName, javaSerializer)); - } + // ------------------------------------------------------------------------------------------- + // State access methods + // ------------------------------------------------------------------------------------------- @Override - public <S> ListState<S> getOperatorState(ListStateDescriptor<S> stateDescriptor) throws IOException { - return getOperatorState(stateDescriptor, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE); + public <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception { + return getListState(stateDescriptor, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE); } @SuppressWarnings("unchecked") @@ -103,64 +101,38 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { } public <S> ListState<S> getBroadcastOperatorState(ListStateDescriptor<S> stateDescriptor) throws Exception { - return getOperatorState(stateDescriptor, OperatorStateHandle.Mode.BROADCAST); + return getListState(stateDescriptor, OperatorStateHandle.Mode.BROADCAST); } - private <S> ListState<S> getOperatorState( - ListStateDescriptor<S> stateDescriptor, - OperatorStateHandle.Mode mode) throws IOException { - - Preconditions.checkNotNull(stateDescriptor); - - stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig()); - - String name = Preconditions.checkNotNull(stateDescriptor.getName()); - TypeSerializer<S> partitionStateSerializer = Preconditions.checkNotNull(stateDescriptor.getElementSerializer()); + // ------------------------------------------------------------------------------------------- + // Deprecated state access methods + // ------------------------------------------------------------------------------------------- - @SuppressWarnings("unchecked") - PartitionableListState<S> partitionableListState = (PartitionableListState<S>) registeredStates.get(name); - - if (null == partitionableListState) { - - partitionableListState = new PartitionableListState<>( - name, - partitionStateSerializer, - mode); - - registeredStates.put(name, partitionableListState); - } else { - Preconditions.checkState( - partitionableListState.getAssignmentMode().equals(mode), - "Incompatible assignment mode. Provided: " + mode + ", expected: " + - partitionableListState.getAssignmentMode()); - Preconditions.checkState( - stateDescriptor.getElementSerializer(). - canRestoreFrom(partitionableListState.getPartitionStateSerializer()), - "Incompatible type serializers. Provided: " + stateDescriptor.getElementSerializer() + - ", found: " + partitionableListState.getPartitionStateSerializer()); - } - - return partitionableListState; + /** + * @deprecated This was deprecated as part of a refinement to the function names. + * Please use {@link #getListState(ListStateDescriptor)} instead. + */ + @Deprecated + @Override + public <S> ListState<S> getOperatorState(ListStateDescriptor<S> stateDescriptor) throws Exception { + return getListState(stateDescriptor); } - private static <S> void deserializeStateValues( - PartitionableListState<S> stateListForName, - FSDataInputStream in, - OperatorStateHandle.StateMetaInfo metaInfo) throws IOException { - - if (null != metaInfo) { - long[] offsets = metaInfo.getOffsets(); - if (null != offsets) { - DataInputView div = new DataInputViewStreamWrapper(in); - TypeSerializer<S> serializer = stateListForName.getPartitionStateSerializer(); - for (long offset : offsets) { - in.seek(offset); - stateListForName.add(serializer.deserialize(div)); - } - } - } + /** + * @deprecated Using Java serialization for persisting state is not encouraged. + * Please use {@link #getListState(ListStateDescriptor)} instead. + */ + @SuppressWarnings("unchecked") + @Deprecated + @Override + public <T extends Serializable> ListState<T> getSerializableListState(String stateName) throws Exception { + return (ListState<T>) getListState(new ListStateDescriptor<>(stateName, javaSerializer)); } + // ------------------------------------------------------------------------------------------- + // Snapshot and restore + // ------------------------------------------------------------------------------------------- + @Override public RunnableFuture<OperatorStateHandle> snapshot( long checkpointId, @@ -358,4 +330,59 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { return partitionOffsets; } } + + private <S> ListState<S> getListState( + ListStateDescriptor<S> stateDescriptor, + OperatorStateHandle.Mode mode) throws IOException { + + Preconditions.checkNotNull(stateDescriptor); + + stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig()); + + String name = Preconditions.checkNotNull(stateDescriptor.getName()); + TypeSerializer<S> partitionStateSerializer = Preconditions.checkNotNull(stateDescriptor.getElementSerializer()); + + @SuppressWarnings("unchecked") + PartitionableListState<S> partitionableListState = (PartitionableListState<S>) registeredStates.get(name); + + if (null == partitionableListState) { + + partitionableListState = new PartitionableListState<>( + name, + partitionStateSerializer, + mode); + + registeredStates.put(name, partitionableListState); + } else { + Preconditions.checkState( + partitionableListState.getAssignmentMode().equals(mode), + "Incompatible assignment mode. Provided: " + mode + ", expected: " + + partitionableListState.getAssignmentMode()); + Preconditions.checkState( + stateDescriptor.getElementSerializer(). + canRestoreFrom(partitionableListState.getPartitionStateSerializer()), + "Incompatible type serializers. Provided: " + stateDescriptor.getElementSerializer() + + ", found: " + partitionableListState.getPartitionStateSerializer()); + } + + return partitionableListState; + } + + private static <S> void deserializeStateValues( + PartitionableListState<S> stateListForName, + FSDataInputStream in, + OperatorStateHandle.StateMetaInfo metaInfo) throws IOException { + + if (null != metaInfo) { + long[] offsets = metaInfo.getOffsets(); + if (null != offsets) { + DataInputView div = new DataInputViewStreamWrapper(in); + TypeSerializer<S> serializer = stateListForName.getPartitionStateSerializer(); + for (long offset : offsets) { + in.seek(offset); + stateListForName.add(serializer.deserialize(div)); + } + } + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/a1aab642/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java index bc446f3..5a072df 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java @@ -80,10 +80,10 @@ public class OperatorStateBackendTest { ListStateDescriptor<File> stateDescriptor = new ListStateDescriptor<>("test", File.class); ListStateDescriptor<String> stateDescriptor2 = new ListStateDescriptor<>("test2", String.class); - ListState<File> listState = operatorStateBackend.getOperatorState(stateDescriptor); + ListState<File> listState = operatorStateBackend.getListState(stateDescriptor); assertNotNull(listState); - ListState<String> listState2 = operatorStateBackend.getOperatorState(stateDescriptor2); + ListState<String> listState2 = operatorStateBackend.getListState(stateDescriptor2); assertNotNull(listState2); assertEquals(2, operatorStateBackend.getRegisteredStateNames().size()); @@ -113,7 +113,7 @@ public class OperatorStateBackendTest { ListStateDescriptor<Serializable> stateDescriptor1 = new ListStateDescriptor<>("test1", new JavaSerializer<>()); ListStateDescriptor<Serializable> stateDescriptor2 = new ListStateDescriptor<>("test2", new JavaSerializer<>()); ListStateDescriptor<Serializable> stateDescriptor3 = new ListStateDescriptor<>("test3", new JavaSerializer<>()); - ListState<Serializable> listState1 = operatorStateBackend.getOperatorState(stateDescriptor1); + ListState<Serializable> listState1 = operatorStateBackend.getListState(stateDescriptor1); assertNotNull(listState1); assertEquals(1, operatorStateBackend.getRegisteredStateNames().size()); Iterator<Serializable> it = listState1.get().iterator(); @@ -126,7 +126,7 @@ public class OperatorStateBackendTest { assertEquals(4711, it.next()); assertTrue(!it.hasNext()); - ListState<Serializable> listState2 = operatorStateBackend.getOperatorState(stateDescriptor2); + ListState<Serializable> listState2 = operatorStateBackend.getListState(stateDescriptor2); assertNotNull(listState2); assertEquals(2, operatorStateBackend.getRegisteredStateNames().size()); assertTrue(!it.hasNext()); @@ -154,7 +154,7 @@ public class OperatorStateBackendTest { assertEquals(123, it.next()); assertTrue(!it.hasNext()); - ListState<Serializable> listState1b = operatorStateBackend.getOperatorState(stateDescriptor1); + ListState<Serializable> listState1b = operatorStateBackend.getListState(stateDescriptor1); assertNotNull(listState1b); listState1b.add(123); it = listState1b.get().iterator(); @@ -183,7 +183,7 @@ public class OperatorStateBackendTest { } try { - operatorStateBackend.getOperatorState(stateDescriptor3); + operatorStateBackend.getListState(stateDescriptor3); fail("Did not detect changed mode"); } catch (IllegalStateException ignored) { @@ -217,8 +217,8 @@ public class OperatorStateBackendTest { ListStateDescriptor<Serializable> stateDescriptor1 = new ListStateDescriptor<>("test1", new JavaSerializer<>()); ListStateDescriptor<Serializable> stateDescriptor2 = new ListStateDescriptor<>("test2", new JavaSerializer<>()); ListStateDescriptor<Serializable> stateDescriptor3 = new ListStateDescriptor<>("test3", new JavaSerializer<>()); - ListState<Serializable> listState1 = operatorStateBackend.getOperatorState(stateDescriptor1); - ListState<Serializable> listState2 = operatorStateBackend.getOperatorState(stateDescriptor2); + ListState<Serializable> listState1 = operatorStateBackend.getListState(stateDescriptor1); + ListState<Serializable> listState2 = operatorStateBackend.getListState(stateDescriptor2); ListState<Serializable> listState3 = operatorStateBackend.getBroadcastOperatorState(stateDescriptor3); listState1.add(42); @@ -251,8 +251,8 @@ public class OperatorStateBackendTest { assertEquals(3, operatorStateBackend.getRegisteredStateNames().size()); - listState1 = operatorStateBackend.getOperatorState(stateDescriptor1); - listState2 = operatorStateBackend.getOperatorState(stateDescriptor2); + listState1 = operatorStateBackend.getListState(stateDescriptor1); + listState2 = operatorStateBackend.getListState(stateDescriptor2); listState3 = operatorStateBackend.getBroadcastOperatorState(stateDescriptor3); assertEquals(3, operatorStateBackend.getRegisteredStateNames().size()); http://git-wip-us.apache.org/repos/asf/flink/blob/a1aab642/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java index 589e285..a7539d1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java @@ -132,7 +132,7 @@ public class ContinuousFileMonitoringFunction<OUT> Preconditions.checkState(this.checkpointedState == null, "The " + getClass().getSimpleName() + " has already been initialized."); - this.checkpointedState = context.getOperatorStateStore().getOperatorState( + this.checkpointedState = context.getOperatorStateStore().getListState( new ListStateDescriptor<>( "file-monitoring-state", LongSerializer.INSTANCE http://git-wip-us.apache.org/repos/asf/flink/blob/a1aab642/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java index e3a9d54..e294cae 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java @@ -104,7 +104,7 @@ public class FromElementsFunction<T> implements SourceFunction<T>, CheckpointedF Preconditions.checkState(this.checkpointedState == null, "The " + getClass().getSimpleName() + " has already been initialized."); - this.checkpointedState = context.getOperatorStateStore().getOperatorState( + this.checkpointedState = context.getOperatorStateStore().getListState( new ListStateDescriptor<>( "from-elements-state", IntSerializer.INSTANCE http://git-wip-us.apache.org/repos/asf/flink/blob/a1aab642/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java index bdb12f3..fb1c94d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java @@ -71,7 +71,7 @@ public class StatefulSequenceSource extends RichParallelSourceFunction<Long> imp Preconditions.checkState(this.checkpointedState == null, "The " + getClass().getSimpleName() + " has already been initialized."); - this.checkpointedState = context.getOperatorStateStore().getOperatorState( + this.checkpointedState = context.getOperatorStateStore().getListState( new ListStateDescriptor<>( "stateful-sequence-source-state", LongSerializer.INSTANCE http://git-wip-us.apache.org/repos/asf/flink/blob/a1aab642/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java index 4cf79b1..ff507e3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java @@ -243,7 +243,7 @@ public class AsyncWaitOperator<IN, OUT> super.snapshotState(context); ListState<StreamElement> partitionableState = - getOperatorStateBackend().getOperatorState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer)); + getOperatorStateBackend().getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer)); partitionableState.clear(); Collection<StreamElementQueueEntry<?>> values = queue.values(); @@ -269,7 +269,7 @@ public class AsyncWaitOperator<IN, OUT> public void initializeState(StateInitializationContext context) throws Exception { recoveredStreamElements = context .getOperatorStateStore() - .getOperatorState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer)); + .getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer)); } http://git-wip-us.apache.org/repos/asf/flink/blob/a1aab642/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java index 08fbcbe..8a5c997 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java @@ -19,9 +19,11 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputViewStreamWrapper; @@ -169,8 +171,13 @@ public class StreamOperatorSnapshotRestoreTest { Assert.assertEquals(verifyRestore, context.isRestored()); - keyedState = context.getKeyedStateStore().getState(new ValueStateDescriptor<>("managed-keyed", Integer.class, 0)); - opState = context.getOperatorStateStore().getSerializableListState("managed-op-state"); + keyedState = context + .getKeyedStateStore() + .getState(new ValueStateDescriptor<>("managed-keyed", Integer.class, 0)); + + opState = context + .getOperatorStateStore() + .getListState(new ListStateDescriptor<>("managed-op-state", IntSerializer.INSTANCE)); if (context.isRestored()) { // check restored raw keyed state http://git-wip-us.apache.org/repos/asf/flink/blob/a1aab642/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java index ab4258f..b675cc5 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java @@ -21,6 +21,8 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.java.operators.translation.WrappingFunction; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; @@ -139,7 +141,9 @@ public class WrappingFunctionSnapshotRestoreTest { @Override public void initializeState(FunctionInitializationContext context) throws Exception { - serializableListState = context.getOperatorStateStore().getSerializableListState("test-state"); + serializableListState = context + .getOperatorStateStore() + .getListState(new ListStateDescriptor<>("test-state", IntSerializer.INSTANCE)); if (context.isRestored()) { Iterator<Integer> integers = serializableListState.get().iterator(); int act = integers.next(); http://git-wip-us.apache.org/repos/asf/flink/blob/a1aab642/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java index 4e8bfd8..90f5619 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java @@ -721,7 +721,7 @@ public class OneInputStreamTaskTest extends TestLogger { public void open() throws Exception { super.open(); - ListState<Integer> partitionableState = getOperatorStateBackend().getOperatorState(TEST_DESCRIPTOR); + ListState<Integer> partitionableState = getOperatorStateBackend().getListState(TEST_DESCRIPTOR); if (numberSnapshotCalls == 0) { for (Integer v : partitionableState.get()) { @@ -742,7 +742,7 @@ public class OneInputStreamTaskTest extends TestLogger { @Override public void snapshotState(StateSnapshotContext context) throws Exception { ListState<Integer> partitionableState = - getOperatorStateBackend().getOperatorState(TEST_DESCRIPTOR); + getOperatorStateBackend().getListState(TEST_DESCRIPTOR); partitionableState.clear(); partitionableState.add(42); http://git-wip-us.apache.org/repos/asf/flink/blob/a1aab642/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java index 875d0ed..fa05e1d 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java @@ -23,8 +23,10 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigConstants; @@ -996,8 +998,9 @@ public class RescalingITCase extends TestLogger { this.counterPartitions = operatorStateStore.getBroadcastSerializableListState("counter_partitions"); } else { - this.counterPartitions = - context.getOperatorStateStore().getSerializableListState("counter_partitions"); + this.counterPartitions = context + .getOperatorStateStore() + .getListState(new ListStateDescriptor<>("counter_partitions", IntSerializer.INSTANCE)); } if (context.isRestored()) {