Repository: flink
Updated Branches:
  refs/heads/master 0182141d4 -> 2ef4900aa


[FLINK-6324] [DataStream] Refine OperatorStateStore interface

This commit refines the OperatorStateStore interface by,
- deprecating Java serialization shortcuts
- rename getOperatorState to getListState

The Java serialization shortcuts are deprecated because they were
previously introduced to provide a smoother migration path from older
savepoints. However, its usage should definitely be discouraged.

Renaming to getListState is a preparation of making the names of state
access methods contain information about both its redistribution pattern
on restore and the shape of its datastructure, since the combination of
these two is orthogonal. This convention will also provide a better
naming pattern for more state access methods in the future, for example
getUnionListState. If the method name does not contain its
redistribution pattern (e.g., getListState), then it simply implies the
default repartitioning scheme (SPLIT_DISTRIBUTE).


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a1aab642
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a1aab642
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a1aab642

Branch: refs/heads/master
Commit: a1aab64231f3177954404a1d160d8932d76b2826
Parents: 0182141
Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Authored: Wed Apr 19 02:19:48 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Committed: Wed Apr 19 10:32:08 2017 +0800

----------------------------------------------------------------------
 .../kafka/FlinkKafkaConsumerBaseTest.java       |   6 +-
 .../api/common/state/OperatorStateStore.java    |  40 +++++-
 .../ProcTimeUnboundedNonPartitionedOver.scala   |   2 +-
 .../state/DefaultOperatorStateBackend.java      | 143 +++++++++++--------
 .../runtime/state/OperatorStateBackendTest.java |  20 +--
 .../ContinuousFileMonitoringFunction.java       |   2 +-
 .../functions/source/FromElementsFunction.java  |   2 +-
 .../source/StatefulSequenceSource.java          |   2 +-
 .../api/operators/async/AsyncWaitOperator.java  |   4 +-
 .../StreamOperatorSnapshotRestoreTest.java      |  11 +-
 .../WrappingFunctionSnapshotRestoreTest.java    |   6 +-
 .../runtime/tasks/OneInputStreamTaskTest.java   |   4 +-
 .../test/checkpointing/RescalingITCase.java     |   7 +-
 13 files changed, 158 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a1aab642/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index 123c2be..4f5b283 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -109,7 +109,7 @@ public class FlinkKafkaConsumerBaseTest {
                FlinkKafkaConsumerBase<String> consumer = getConsumer(fetcher, 
new LinkedMap(), false);
                OperatorStateStore operatorStateStore = 
mock(OperatorStateStore.class);
                TestingListState<Tuple2<KafkaTopicPartition, Long>> listState = 
new TestingListState<>();
-               
when(operatorStateStore.getOperatorState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
+               
when(operatorStateStore.getListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
 
                consumer.snapshotState(new 
StateSnapshotContextSynchronousImpl(1, 1));
 
@@ -320,7 +320,7 @@ public class FlinkKafkaConsumerBaseTest {
 
                OperatorStateStore operatorStateStore = 
mock(OperatorStateStore.class);
                listState = new TestingListState<>();
-               
when(operatorStateStore.getOperatorState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
+               
when(operatorStateStore.getListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
 
                // create 500 snapshots
                for (int i = 100; i < 600; i++) {
@@ -444,7 +444,7 @@ public class FlinkKafkaConsumerBaseTest {
 
                OperatorStateStore operatorStateStore = 
mock(OperatorStateStore.class);
                listState = new TestingListState<>();
-               
when(operatorStateStore.getOperatorState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
+               
when(operatorStateStore.getListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
 
                // create 500 snapshots
                for (int i = 100; i < 600; i++) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a1aab642/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
index c1cdfe4..8be04fc 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
@@ -41,6 +41,34 @@ public interface OperatorStateStore {
         * @return A list for all state partitions.
         * @throws Exception
         */
+       <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) 
throws Exception;
+
+       /**
+        * Returns a set with the names of all currently registered states.
+        * @return set of names for all registered states.
+        */
+       Set<String> getRegisteredStateNames();
+
+       // 
-------------------------------------------------------------------------------------------
+       //  Deprecated methods
+       // 
-------------------------------------------------------------------------------------------
+
+       /**
+        * Creates (or restores) a list state. Each state is registered under a 
unique name.
+        * The provided serializer is used to de/serialize the state in case of 
checkpointing (snapshot/restore).
+        *
+        * The items in the list are repartitionable by the system in case of 
changed operator parallelism.
+        *
+        * @param stateDescriptor The descriptor for this state, providing a 
name and serializer.
+        * @param <S> The generic type of the state
+        *
+        * @return A list for all state partitions.
+        * @throws Exception
+        *
+        * @deprecated since 1.3.0. This was deprecated as part of a refinement 
to the function names.
+        *             Please use {@link #getListState(ListStateDescriptor)} 
instead.
+        */
+       @Deprecated
        <S> ListState<S> getOperatorState(ListStateDescriptor<S> 
stateDescriptor) throws Exception;
 
        /**
@@ -48,17 +76,15 @@ public interface OperatorStateStore {
         * are repartitionable by the system in case of changed operator 
parallelism.
         * 
         * <p>This is a simple convenience method. For more flexibility on how 
state serialization
-        * should happen, use the {@link 
#getOperatorState(ListStateDescriptor)} method.
+        * should happen, use the {@link #getListState(ListStateDescriptor)} 
method.
         *
         * @param stateName The name of state to create
         * @return A list state using Java serialization to serialize state 
objects.
         * @throws Exception
+        *
+        * @deprecated since 1.3.0. Using Java serialization for persisting 
state is not encouraged.
+        *             Please use {@link #getListState(ListStateDescriptor)} 
instead.
         */
+       @Deprecated
        <T extends Serializable> ListState<T> getSerializableListState(String 
stateName) throws Exception;
-
-       /**
-        * Returns a set with the names of all currently registered states.
-        * @return set of names for all registered states.
-        */
-       Set<String> getRegisteredStateNames();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a1aab642/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala
index 6b9800b..75209db 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala
@@ -91,6 +91,6 @@ class ProcTimeUnboundedNonPartitionedOver(
 
   override def initializeState(context: FunctionInitializationContext): Unit = 
{
     val accumulatorsDescriptor = new ListStateDescriptor[Row]("overState", 
aggregationStateType)
-    state = 
context.getOperatorStateStore.getOperatorState(accumulatorsDescriptor)
+    state = context.getOperatorStateStore.getListState(accumulatorsDescriptor)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a1aab642/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index ca7cb48..42d4519 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -86,15 +86,13 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
                registeredStates.clear();
        }
 
-       @SuppressWarnings("unchecked")
-       @Override
-       public <T extends Serializable> ListState<T> 
getSerializableListState(String stateName) throws Exception {
-               return (ListState<T>) getOperatorState(new 
ListStateDescriptor<>(stateName, javaSerializer));
-       }
+       // 
-------------------------------------------------------------------------------------------
+       //  State access methods
+       // 
-------------------------------------------------------------------------------------------
 
        @Override
-       public <S> ListState<S> getOperatorState(ListStateDescriptor<S> 
stateDescriptor) throws IOException {
-               return getOperatorState(stateDescriptor, 
OperatorStateHandle.Mode.SPLIT_DISTRIBUTE);
+       public <S> ListState<S> getListState(ListStateDescriptor<S> 
stateDescriptor) throws Exception {
+               return getListState(stateDescriptor, 
OperatorStateHandle.Mode.SPLIT_DISTRIBUTE);
        }
 
        @SuppressWarnings("unchecked")
@@ -103,64 +101,38 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
        }
 
        public <S> ListState<S> 
getBroadcastOperatorState(ListStateDescriptor<S> stateDescriptor) throws 
Exception {
-               return getOperatorState(stateDescriptor, 
OperatorStateHandle.Mode.BROADCAST);
+               return getListState(stateDescriptor, 
OperatorStateHandle.Mode.BROADCAST);
        }
 
-       private <S> ListState<S> getOperatorState(
-                       ListStateDescriptor<S> stateDescriptor,
-                       OperatorStateHandle.Mode mode) throws IOException {
-
-               Preconditions.checkNotNull(stateDescriptor);
-
-               
stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
-
-               String name = 
Preconditions.checkNotNull(stateDescriptor.getName());
-               TypeSerializer<S> partitionStateSerializer = 
Preconditions.checkNotNull(stateDescriptor.getElementSerializer());
+       // 
-------------------------------------------------------------------------------------------
+       //  Deprecated state access methods
+       // 
-------------------------------------------------------------------------------------------
 
-               @SuppressWarnings("unchecked")
-               PartitionableListState<S> partitionableListState = 
(PartitionableListState<S>) registeredStates.get(name);
-
-               if (null == partitionableListState) {
-
-                       partitionableListState = new PartitionableListState<>(
-                                       name,
-                                       partitionStateSerializer,
-                                       mode);
-
-                       registeredStates.put(name, partitionableListState);
-               } else {
-                       Preconditions.checkState(
-                                       
partitionableListState.getAssignmentMode().equals(mode),
-                                       "Incompatible assignment mode. 
Provided: " + mode + ", expected: " +
-                                                       
partitionableListState.getAssignmentMode());
-                       Preconditions.checkState(
-                                       stateDescriptor.getElementSerializer().
-                                                       
canRestoreFrom(partitionableListState.getPartitionStateSerializer()),
-                                       "Incompatible type serializers. 
Provided: " + stateDescriptor.getElementSerializer() +
-                                                       ", found: " + 
partitionableListState.getPartitionStateSerializer());
-               }
-
-               return partitionableListState;
+       /**
+        * @deprecated This was deprecated as part of a refinement to the 
function names.
+        *             Please use {@link #getListState(ListStateDescriptor)} 
instead.
+        */
+       @Deprecated
+       @Override
+       public <S> ListState<S> getOperatorState(ListStateDescriptor<S> 
stateDescriptor) throws Exception {
+               return getListState(stateDescriptor);
        }
 
-       private static <S> void deserializeStateValues(
-                       PartitionableListState<S> stateListForName,
-                       FSDataInputStream in,
-                       OperatorStateHandle.StateMetaInfo metaInfo) throws 
IOException {
-
-               if (null != metaInfo) {
-                       long[] offsets = metaInfo.getOffsets();
-                       if (null != offsets) {
-                               DataInputView div = new 
DataInputViewStreamWrapper(in);
-                               TypeSerializer<S> serializer = 
stateListForName.getPartitionStateSerializer();
-                               for (long offset : offsets) {
-                                       in.seek(offset);
-                                       
stateListForName.add(serializer.deserialize(div));
-                               }
-                       }
-               }
+       /**
+        * @deprecated Using Java serialization for persisting state is not 
encouraged.
+        *             Please use {@link #getListState(ListStateDescriptor)} 
instead.
+        */
+       @SuppressWarnings("unchecked")
+       @Deprecated
+       @Override
+       public <T extends Serializable> ListState<T> 
getSerializableListState(String stateName) throws Exception {
+               return (ListState<T>) getListState(new 
ListStateDescriptor<>(stateName, javaSerializer));
        }
 
+       // 
-------------------------------------------------------------------------------------------
+       //  Snapshot and restore
+       // 
-------------------------------------------------------------------------------------------
+
        @Override
        public RunnableFuture<OperatorStateHandle> snapshot(
                        long checkpointId,
@@ -358,4 +330,59 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
                        return partitionOffsets;
                }
        }
+
+       private <S> ListState<S> getListState(
+               ListStateDescriptor<S> stateDescriptor,
+               OperatorStateHandle.Mode mode) throws IOException {
+
+               Preconditions.checkNotNull(stateDescriptor);
+
+               
stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
+
+               String name = 
Preconditions.checkNotNull(stateDescriptor.getName());
+               TypeSerializer<S> partitionStateSerializer = 
Preconditions.checkNotNull(stateDescriptor.getElementSerializer());
+
+               @SuppressWarnings("unchecked")
+               PartitionableListState<S> partitionableListState = 
(PartitionableListState<S>) registeredStates.get(name);
+
+               if (null == partitionableListState) {
+
+                       partitionableListState = new PartitionableListState<>(
+                               name,
+                               partitionStateSerializer,
+                               mode);
+
+                       registeredStates.put(name, partitionableListState);
+               } else {
+                       Preconditions.checkState(
+                               
partitionableListState.getAssignmentMode().equals(mode),
+                               "Incompatible assignment mode. Provided: " + 
mode + ", expected: " +
+                                       
partitionableListState.getAssignmentMode());
+                       Preconditions.checkState(
+                               stateDescriptor.getElementSerializer().
+                                       
canRestoreFrom(partitionableListState.getPartitionStateSerializer()),
+                               "Incompatible type serializers. Provided: " + 
stateDescriptor.getElementSerializer() +
+                                       ", found: " + 
partitionableListState.getPartitionStateSerializer());
+               }
+
+               return partitionableListState;
+       }
+
+       private static <S> void deserializeStateValues(
+               PartitionableListState<S> stateListForName,
+               FSDataInputStream in,
+               OperatorStateHandle.StateMetaInfo metaInfo) throws IOException {
+
+               if (null != metaInfo) {
+                       long[] offsets = metaInfo.getOffsets();
+                       if (null != offsets) {
+                               DataInputView div = new 
DataInputViewStreamWrapper(in);
+                               TypeSerializer<S> serializer = 
stateListForName.getPartitionStateSerializer();
+                               for (long offset : offsets) {
+                                       in.seek(offset);
+                                       
stateListForName.add(serializer.deserialize(div));
+                               }
+                       }
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a1aab642/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index bc446f3..5a072df 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -80,10 +80,10 @@ public class OperatorStateBackendTest {
                ListStateDescriptor<File> stateDescriptor = new 
ListStateDescriptor<>("test", File.class);
                ListStateDescriptor<String> stateDescriptor2 = new 
ListStateDescriptor<>("test2", String.class);
 
-               ListState<File> listState = 
operatorStateBackend.getOperatorState(stateDescriptor);
+               ListState<File> listState = 
operatorStateBackend.getListState(stateDescriptor);
                assertNotNull(listState);
 
-               ListState<String> listState2 = 
operatorStateBackend.getOperatorState(stateDescriptor2);
+               ListState<String> listState2 = 
operatorStateBackend.getListState(stateDescriptor2);
                assertNotNull(listState2);
 
                assertEquals(2, 
operatorStateBackend.getRegisteredStateNames().size());
@@ -113,7 +113,7 @@ public class OperatorStateBackendTest {
                ListStateDescriptor<Serializable> stateDescriptor1 = new 
ListStateDescriptor<>("test1", new JavaSerializer<>());
                ListStateDescriptor<Serializable> stateDescriptor2 = new 
ListStateDescriptor<>("test2", new JavaSerializer<>());
                ListStateDescriptor<Serializable> stateDescriptor3 = new 
ListStateDescriptor<>("test3", new JavaSerializer<>());
-               ListState<Serializable> listState1 = 
operatorStateBackend.getOperatorState(stateDescriptor1);
+               ListState<Serializable> listState1 = 
operatorStateBackend.getListState(stateDescriptor1);
                assertNotNull(listState1);
                assertEquals(1, 
operatorStateBackend.getRegisteredStateNames().size());
                Iterator<Serializable> it = listState1.get().iterator();
@@ -126,7 +126,7 @@ public class OperatorStateBackendTest {
                assertEquals(4711, it.next());
                assertTrue(!it.hasNext());
 
-               ListState<Serializable> listState2 = 
operatorStateBackend.getOperatorState(stateDescriptor2);
+               ListState<Serializable> listState2 = 
operatorStateBackend.getListState(stateDescriptor2);
                assertNotNull(listState2);
                assertEquals(2, 
operatorStateBackend.getRegisteredStateNames().size());
                assertTrue(!it.hasNext());
@@ -154,7 +154,7 @@ public class OperatorStateBackendTest {
                assertEquals(123, it.next());
                assertTrue(!it.hasNext());
 
-               ListState<Serializable> listState1b = 
operatorStateBackend.getOperatorState(stateDescriptor1);
+               ListState<Serializable> listState1b = 
operatorStateBackend.getListState(stateDescriptor1);
                assertNotNull(listState1b);
                listState1b.add(123);
                it = listState1b.get().iterator();
@@ -183,7 +183,7 @@ public class OperatorStateBackendTest {
                }
 
                try {
-                       operatorStateBackend.getOperatorState(stateDescriptor3);
+                       operatorStateBackend.getListState(stateDescriptor3);
                        fail("Did not detect changed mode");
                } catch (IllegalStateException ignored) {
 
@@ -217,8 +217,8 @@ public class OperatorStateBackendTest {
                ListStateDescriptor<Serializable> stateDescriptor1 = new 
ListStateDescriptor<>("test1", new JavaSerializer<>());
                ListStateDescriptor<Serializable> stateDescriptor2 = new 
ListStateDescriptor<>("test2", new JavaSerializer<>());
                ListStateDescriptor<Serializable> stateDescriptor3 = new 
ListStateDescriptor<>("test3", new JavaSerializer<>());
-               ListState<Serializable> listState1 = 
operatorStateBackend.getOperatorState(stateDescriptor1);
-               ListState<Serializable> listState2 = 
operatorStateBackend.getOperatorState(stateDescriptor2);
+               ListState<Serializable> listState1 = 
operatorStateBackend.getListState(stateDescriptor1);
+               ListState<Serializable> listState2 = 
operatorStateBackend.getListState(stateDescriptor2);
                ListState<Serializable> listState3 = 
operatorStateBackend.getBroadcastOperatorState(stateDescriptor3);
 
                listState1.add(42);
@@ -251,8 +251,8 @@ public class OperatorStateBackendTest {
 
                        assertEquals(3, 
operatorStateBackend.getRegisteredStateNames().size());
 
-                       listState1 = 
operatorStateBackend.getOperatorState(stateDescriptor1);
-                       listState2 = 
operatorStateBackend.getOperatorState(stateDescriptor2);
+                       listState1 = 
operatorStateBackend.getListState(stateDescriptor1);
+                       listState2 = 
operatorStateBackend.getListState(stateDescriptor2);
                        listState3 = 
operatorStateBackend.getBroadcastOperatorState(stateDescriptor3);
 
                        assertEquals(3, 
operatorStateBackend.getRegisteredStateNames().size());

http://git-wip-us.apache.org/repos/asf/flink/blob/a1aab642/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
index 589e285..a7539d1 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
@@ -132,7 +132,7 @@ public class ContinuousFileMonitoringFunction<OUT>
                Preconditions.checkState(this.checkpointedState == null,
                        "The " + getClass().getSimpleName() + " has already 
been initialized.");
 
-               this.checkpointedState = 
context.getOperatorStateStore().getOperatorState(
+               this.checkpointedState = 
context.getOperatorStateStore().getListState(
                        new ListStateDescriptor<>(
                                "file-monitoring-state",
                                LongSerializer.INSTANCE

http://git-wip-us.apache.org/repos/asf/flink/blob/a1aab642/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
index e3a9d54..e294cae 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
@@ -104,7 +104,7 @@ public class FromElementsFunction<T> implements 
SourceFunction<T>, CheckpointedF
                Preconditions.checkState(this.checkpointedState == null,
                        "The " + getClass().getSimpleName() + " has already 
been initialized.");
 
-               this.checkpointedState = 
context.getOperatorStateStore().getOperatorState(
+               this.checkpointedState = 
context.getOperatorStateStore().getListState(
                        new ListStateDescriptor<>(
                                "from-elements-state",
                                IntSerializer.INSTANCE

http://git-wip-us.apache.org/repos/asf/flink/blob/a1aab642/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
index bdb12f3..fb1c94d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
@@ -71,7 +71,7 @@ public class StatefulSequenceSource extends 
RichParallelSourceFunction<Long> imp
                Preconditions.checkState(this.checkpointedState == null,
                        "The " + getClass().getSimpleName() + " has already 
been initialized.");
 
-               this.checkpointedState = 
context.getOperatorStateStore().getOperatorState(
+               this.checkpointedState = 
context.getOperatorStateStore().getListState(
                        new ListStateDescriptor<>(
                                "stateful-sequence-source-state",
                                LongSerializer.INSTANCE

http://git-wip-us.apache.org/repos/asf/flink/blob/a1aab642/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
index 4cf79b1..ff507e3 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
@@ -243,7 +243,7 @@ public class AsyncWaitOperator<IN, OUT>
                super.snapshotState(context);
 
                ListState<StreamElement> partitionableState =
-                       getOperatorStateBackend().getOperatorState(new 
ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
+                       getOperatorStateBackend().getListState(new 
ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
                partitionableState.clear();
 
                Collection<StreamElementQueueEntry<?>> values = queue.values();
@@ -269,7 +269,7 @@ public class AsyncWaitOperator<IN, OUT>
        public void initializeState(StateInitializationContext context) throws 
Exception {
                recoveredStreamElements = context
                        .getOperatorStateStore()
-                       .getOperatorState(new ListStateDescriptor<>(STATE_NAME, 
inStreamElementSerializer));
+                       .getListState(new ListStateDescriptor<>(STATE_NAME, 
inStreamElementSerializer));
 
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a1aab642/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
index 08fbcbe..8a5c997 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
@@ -19,9 +19,11 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -169,8 +171,13 @@ public class StreamOperatorSnapshotRestoreTest {
 
                        Assert.assertEquals(verifyRestore, 
context.isRestored());
 
-                       keyedState = context.getKeyedStateStore().getState(new 
ValueStateDescriptor<>("managed-keyed", Integer.class, 0));
-                       opState = 
context.getOperatorStateStore().getSerializableListState("managed-op-state");
+                       keyedState = context
+                                       .getKeyedStateStore()
+                                       .getState(new 
ValueStateDescriptor<>("managed-keyed", Integer.class, 0));
+
+                       opState = context
+                                       .getOperatorStateStore()
+                                       .getListState(new 
ListStateDescriptor<>("managed-op-state", IntSerializer.INSTANCE));
 
                        if (context.isRestored()) {
                                // check restored raw keyed state

http://git-wip-us.apache.org/repos/asf/flink/blob/a1aab642/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java
index ab4258f..b675cc5 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java
@@ -21,6 +21,8 @@ package org.apache.flink.streaming.api.operators;
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.operators.translation.WrappingFunction;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
@@ -139,7 +141,9 @@ public class WrappingFunctionSnapshotRestoreTest {
 
                @Override
                public void initializeState(FunctionInitializationContext 
context) throws Exception {
-                       serializableListState = 
context.getOperatorStateStore().getSerializableListState("test-state");
+                       serializableListState = context
+                                       .getOperatorStateStore()
+                                       .getListState(new 
ListStateDescriptor<>("test-state", IntSerializer.INSTANCE));
                        if (context.isRestored()) {
                                Iterator<Integer> integers = 
serializableListState.get().iterator();
                                int act = integers.next();

http://git-wip-us.apache.org/repos/asf/flink/blob/a1aab642/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 4e8bfd8..90f5619 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -721,7 +721,7 @@ public class OneInputStreamTaskTest extends TestLogger {
                public void open() throws Exception {
                        super.open();
 
-                       ListState<Integer> partitionableState = 
getOperatorStateBackend().getOperatorState(TEST_DESCRIPTOR);
+                       ListState<Integer> partitionableState = 
getOperatorStateBackend().getListState(TEST_DESCRIPTOR);
 
                        if (numberSnapshotCalls == 0) {
                                for (Integer v : partitionableState.get()) {
@@ -742,7 +742,7 @@ public class OneInputStreamTaskTest extends TestLogger {
                @Override
                public void snapshotState(StateSnapshotContext context) throws 
Exception {
                        ListState<Integer> partitionableState =
-                                       
getOperatorStateBackend().getOperatorState(TEST_DESCRIPTOR);
+                                       
getOperatorStateBackend().getListState(TEST_DESCRIPTOR);
                        partitionableState.clear();
 
                        partitionableState.add(42);

http://git-wip-us.apache.org/repos/asf/flink/blob/a1aab642/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 875d0ed..fa05e1d 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -23,8 +23,10 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
@@ -996,8 +998,9 @@ public class RescalingITCase extends TestLogger {
                                this.counterPartitions =
                                                
operatorStateStore.getBroadcastSerializableListState("counter_partitions");
                        } else {
-                               this.counterPartitions =
-                                               
context.getOperatorStateStore().getSerializableListState("counter_partitions");
+                               this.counterPartitions = context
+                                               .getOperatorStateStore()
+                                               .getListState(new 
ListStateDescriptor<>("counter_partitions", IntSerializer.INSTANCE));
                        }
 
                        if (context.isRestored()) {

Reply via email to