[FLINK-7826][QS] Add support for all types of state to the QS Client.

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/abc3e1c8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/abc3e1c8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/abc3e1c8

Branch: refs/heads/master
Commit: abc3e1c888ceee941d557381a8cb8a7df8af2058
Parents: 717a7dc
Author: kkloudas <kklou...@gmail.com>
Authored: Tue Oct 10 16:40:57 2017 +0200
Committer: kkloudas <kklou...@gmail.com>
Committed: Wed Oct 18 09:40:16 2017 +0200

----------------------------------------------------------------------
 .../common/state/FoldingStateDescriptor.java    |   2 +-
 .../client/QueryableStateClient.java            |  57 +-
 .../client/state/ImmutableAggregatingState.java |  71 ++
 .../client/state/ImmutableFoldingState.java     |  70 ++
 .../client/state/ImmutableListState.java        |  70 ++
 .../client/state/ImmutableMapState.java         | 139 ++++
 .../client/state/ImmutableReducingState.java    |  69 ++
 .../client/state/ImmutableState.java            |  29 +
 .../client/state/ImmutableStateBinder.java      |  80 +++
 .../client/state/ImmutableValueState.java       |  69 ++
 .../network/AbstractServerHandler.java          |   2 +-
 .../itcases/AbstractQueryableStateITCase.java   | 644 +++++++++++++++----
 .../state/ImmutableAggregatingStateTest.java    | 114 ++++
 .../state/ImmutableFoldingStateTest.java        |  94 +++
 .../state/ImmutableListStateTest.java           | 112 ++++
 .../state/ImmutableMapStateTest.java            | 189 ++++++
 .../state/ImmutableReducingStateTest.java       |  84 +++
 .../state/ImmutableValueStateTest.java          |  70 ++
 .../streaming/api/datastream/KeyedStream.java   |   6 +-
 .../api/datastream/QueryableStateStream.java    |  28 +-
 .../flink/streaming/api/scala/KeyedStream.scala |   6 +-
 21 files changed, 1817 insertions(+), 188 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
index f7609c3..0954047 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
@@ -30,7 +30,7 @@ import static java.util.Objects.requireNonNull;
  * {@link StateDescriptor} for {@link FoldingState}. This can be used to 
create partitioned
  * folding state.
  *
- * @param <T> Type of the values folded int othe state
+ * @param <T> Type of the values folded in the other state
  * @param <ACC> Type of the value in the state
  *
  * @deprecated will be removed in a future version in favor of {@link 
AggregatingStateDescriptor}

http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
index 005c874..70bccf0 100644
--- 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
@@ -21,10 +21,12 @@ package org.apache.flink.queryablestate.client;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.queryablestate.client.state.ImmutableStateBinder;
 import org.apache.flink.queryablestate.messages.KvStateRequest;
 import org.apache.flink.queryablestate.messages.KvStateResponse;
 import org.apache.flink.queryablestate.network.Client;
@@ -141,15 +143,15 @@ public class QueryableStateClient {
         * @param key                               The key we are interested 
in.
         * @param keyTypeHint                           A {@link TypeHint} used 
to extract the type of the key.
         * @param stateDescriptor                       The {@link 
StateDescriptor} of the state we want to query.
-        * @return Future holding the result.
+        * @return Future holding the immutable {@link State} object containing 
the result.
         */
        @PublicEvolving
-       public <K, V> CompletableFuture<V> getKvState(
+       public <K, S extends State, V> CompletableFuture<S> getKvState(
                        final JobID jobId,
                        final String queryableStateName,
                        final K key,
                        final TypeHint<K> keyTypeHint,
-                       final StateDescriptor<?, V> stateDescriptor) {
+                       final StateDescriptor<S, V> stateDescriptor) {
 
                Preconditions.checkNotNull(keyTypeHint);
 
@@ -164,15 +166,15 @@ public class QueryableStateClient {
         * @param key                               The key we are interested 
in.
         * @param keyTypeInfo                           The {@link 
TypeInformation} of the key.
         * @param stateDescriptor                       The {@link 
StateDescriptor} of the state we want to query.
-        * @return Future holding the result.
+        * @return Future holding the immutable {@link State} object containing 
the result.
         */
        @PublicEvolving
-       public <K, V> CompletableFuture<V> getKvState(
+       public <K, S extends State, V> CompletableFuture<S> getKvState(
                        final JobID jobId,
                        final String queryableStateName,
                        final K key,
                        final TypeInformation<K> keyTypeInfo,
-                       final StateDescriptor<?, V> stateDescriptor) {
+                       final StateDescriptor<S, V> stateDescriptor) {
 
                return getKvState(jobId, queryableStateName, key, 
VoidNamespace.INSTANCE,
                                keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, 
stateDescriptor);
@@ -187,48 +189,17 @@ public class QueryableStateClient {
         * @param keyTypeInfo                           The {@link 
TypeInformation} of the keys.
         * @param namespaceTypeInfo                     The {@link 
TypeInformation} of the namespace.
         * @param stateDescriptor                       The {@link 
StateDescriptor} of the state we want to query.
-        * @return Future holding the result.
+        * @return Future holding the immutable {@link State} object containing 
the result.
         */
        @PublicEvolving
-       public <K, V, N> CompletableFuture<V> getKvState(
+       public <K, N, S extends State, V> CompletableFuture<S> getKvState(
                        final JobID jobId,
                        final String queryableStateName,
                        final K key,
                        final N namespace,
                        final TypeInformation<K> keyTypeInfo,
                        final TypeInformation<N> namespaceTypeInfo,
-                       final StateDescriptor<?, V> stateDescriptor) {
-
-               Preconditions.checkNotNull(stateDescriptor);
-
-               // initialize the value serializer based on the execution 
config.
-               stateDescriptor.initializeSerializerUnlessSet(executionConfig);
-               TypeSerializer<V> stateSerializer = 
stateDescriptor.getSerializer();
-
-               return getKvState(jobId, queryableStateName, key,
-                               namespace, keyTypeInfo, namespaceTypeInfo, 
stateSerializer);
-       }
-
-       /**
-        * Returns a future holding the request result.
-        * @param jobId                     JobID of the job the queryable 
state belongs to.
-        * @param queryableStateName        Name under which the state is 
queryable.
-        * @param key                               The key that the state we 
request is associated with.
-        * @param namespace                                     The namespace 
of the state.
-        * @param keyTypeInfo                           The {@link 
TypeInformation} of the keys.
-        * @param namespaceTypeInfo                     The {@link 
TypeInformation} of the namespace.
-        * @param stateSerializer                       The {@link 
TypeSerializer} of the state we want to query.
-        * @return Future holding the result.
-        */
-       @PublicEvolving
-       public <K, N, V> CompletableFuture<V> getKvState(
-                       final JobID jobId,
-                       final String queryableStateName,
-                       final K key,
-                       final N namespace,
-                       final TypeInformation<K> keyTypeInfo,
-                       final TypeInformation<N> namespaceTypeInfo,
-                       final TypeSerializer<V> stateSerializer) {
+                       final StateDescriptor<S, V> stateDescriptor) {
 
                Preconditions.checkNotNull(jobId);
                Preconditions.checkNotNull(queryableStateName);
@@ -237,7 +208,7 @@ public class QueryableStateClient {
 
                Preconditions.checkNotNull(keyTypeInfo);
                Preconditions.checkNotNull(namespaceTypeInfo);
-               Preconditions.checkNotNull(stateSerializer);
+               Preconditions.checkNotNull(stateDescriptor);
 
                TypeSerializer<K> keySerializer = 
keyTypeInfo.createSerializer(executionConfig);
                TypeSerializer<N> namespaceSerializer = 
namespaceTypeInfo.createSerializer(executionConfig);
@@ -253,8 +224,8 @@ public class QueryableStateClient {
                return getKvState(jobId, queryableStateName, key.hashCode(), 
serializedKeyAndNamespace).thenApply(
                                stateResponse -> {
                                        try {
-                                               return 
KvStateSerializer.deserializeValue(stateResponse.getContent(), stateSerializer);
-                                       } catch (IOException e) {
+                                               return stateDescriptor.bind(new 
ImmutableStateBinder(stateResponse.getContent()));
+                                       } catch (Exception e) {
                                                throw new 
FlinkRuntimeException(e);
                                        }
                                });

http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java
new file mode 100644
index 0000000..b853cfc
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * A read-only {@link AggregatingState} that <b>does not</b> allow for 
modifications.
+ *
+ * <p>This is the type of the result returned when querying Flink's keyed 
state using the
+ * {@link org.apache.flink.queryablestate.client.QueryableStateClient 
Queryable State Client} and
+ * providing an {@link AggregatingStateDescriptor}.
+ */
+@PublicEvolving
+public final class ImmutableAggregatingState<IN, OUT> extends ImmutableState 
implements AggregatingState<IN, OUT> {
+
+       private final OUT value;
+
+       private ImmutableAggregatingState(OUT value) {
+               this.value = Preconditions.checkNotNull(value);
+       }
+
+       @Override
+       public OUT get() {
+               return value;
+       }
+
+       @Override
+       public void add(Object newValue) {
+               throw MODIFICATION_ATTEMPT_ERROR;
+       }
+
+       @Override
+       public void clear() {
+               throw MODIFICATION_ATTEMPT_ERROR;
+       }
+
+       public static <IN, ACC, OUT> ImmutableAggregatingState<IN, OUT> 
createState(
+                       final AggregatingStateDescriptor<IN, ACC, OUT> 
stateDescriptor,
+                       final byte[] serializedValue) throws IOException {
+
+               final ACC accumulator = KvStateSerializer.deserializeValue(
+                               serializedValue,
+                               stateDescriptor.getSerializer());
+
+               final OUT state = 
stateDescriptor.getAggregateFunction().getResult(accumulator);
+               return new ImmutableAggregatingState<>(state);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java
new file mode 100644
index 0000000..a12adaa
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * A read-only {@link FoldingState} that does not allow for modifications.
+ *
+ * <p>This is the result returned when querying Flink's keyed state using the
+ * {@link org.apache.flink.queryablestate.client.QueryableStateClient 
Queryable State Client} and
+ * providing an {@link FoldingStateDescriptor}.
+ */
+@PublicEvolving
+@Deprecated
+public final class ImmutableFoldingState<IN, ACC> extends ImmutableState 
implements FoldingState<IN, ACC> {
+
+       private final ACC value;
+
+       private ImmutableFoldingState(ACC value) {
+               this.value = Preconditions.checkNotNull(value);
+       }
+
+       @Override
+       public ACC get() {
+               return value;
+       }
+
+       @Override
+       public void add(Object newValue) {
+               throw MODIFICATION_ATTEMPT_ERROR;
+       }
+
+       @Override
+       public void clear() {
+               throw MODIFICATION_ATTEMPT_ERROR;
+       }
+
+       public static <IN, ACC> ImmutableFoldingState<IN, ACC> createState(
+                       final FoldingStateDescriptor<IN, ACC> stateDescriptor,
+                       final byte[] serializedState) throws IOException {
+
+               final ACC state = KvStateSerializer.deserializeValue(
+                               serializedState,
+                               stateDescriptor.getSerializer());
+               return new ImmutableFoldingState<>(state);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java
new file mode 100644
index 0000000..8416905
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A read-only {@link ListState} that does not allow for modifications.
+ *
+ * <p>This is the result returned when querying Flink's keyed state using the
+ * {@link org.apache.flink.queryablestate.client.QueryableStateClient 
Queryable State Client} and
+ * providing an {@link ListStateDescriptor}.
+ */
+@PublicEvolving
+public final class ImmutableListState<V> extends ImmutableState implements 
ListState<V> {
+
+       private final List<V> listState;
+
+       private ImmutableListState(final List<V> state) {
+               this.listState = Preconditions.checkNotNull(state);
+       }
+
+       @Override
+       public Iterable<V> get() {
+               return listState;
+       }
+
+       @Override
+       public void add(V value) {
+               throw MODIFICATION_ATTEMPT_ERROR;
+       }
+
+       @Override
+       public void clear() {
+               throw MODIFICATION_ATTEMPT_ERROR;
+       }
+
+       public static <V> ImmutableListState<V> createState(
+                       final ListStateDescriptor<V> stateDescriptor,
+                       final byte[] serializedState) throws IOException {
+
+               final List<V> state = KvStateSerializer.deserializeList(
+                               serializedState,
+                               stateDescriptor.getElementSerializer());
+               return new ImmutableListState<>(state);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java
new file mode 100644
index 0000000..c216d5d
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A read-only {@link MapState} that does not allow for modifications.
+ *
+ * <p>This is the result returned when querying Flink's keyed state using the
+ * {@link org.apache.flink.queryablestate.client.QueryableStateClient 
Queryable State Client} and
+ * providing an {@link MapStateDescriptor}.
+ */
+@PublicEvolving
+public final class ImmutableMapState<K, V> extends ImmutableState implements 
MapState<K, V> {
+
+       private final Map<K, V> state;
+
+       private ImmutableMapState(final Map<K, V> mapState) {
+               this.state = Preconditions.checkNotNull(mapState);
+       }
+
+       @Override
+       public V get(K key) {
+               return state.get(key);
+       }
+
+       @Override
+       public void put(K key, V value) {
+               throw MODIFICATION_ATTEMPT_ERROR;
+       }
+
+       @Override
+       public void putAll(Map<K, V> map) {
+               throw MODIFICATION_ATTEMPT_ERROR;
+       }
+
+       @Override
+       public void remove(K key) {
+               throw MODIFICATION_ATTEMPT_ERROR;
+       }
+
+       @Override
+       public boolean contains(K key) {
+               return state.containsKey(key);
+       }
+
+       /**
+        * Returns all the mappings in the state in a {@link 
Collections#unmodifiableSet(Set)}.
+        *
+        * @return A read-only iterable view of all the key-value pairs in the 
state.
+        *
+        * @throws Exception Thrown if the system cannot access the state.
+        */
+       @Override
+       public Iterable<Map.Entry<K, V>> entries() {
+               return Collections.unmodifiableSet(state.entrySet());
+       }
+
+       /**
+        * Returns all the keys in the state in a {@link 
Collections#unmodifiableSet(Set)}.
+        *
+        * @return A read-only iterable view of all the keys in the state.
+        *
+        * @throws Exception Thrown if the system cannot access the state.
+        */
+       @Override
+       public Iterable<K> keys() {
+               return Collections.unmodifiableSet(state.keySet());
+       }
+
+       /**
+        * Returns all the values in the state in a {@link 
Collections#unmodifiableCollection(Collection)}.
+        *
+        * @return A read-only iterable view of all the values in the state.
+        *
+        * @throws Exception Thrown if the system cannot access the state.
+        */
+       @Override
+       public Iterable<V> values() {
+               return Collections.unmodifiableCollection(state.values());
+       }
+
+       /**
+        * Iterates over all the mappings in the state. The iterator cannot
+        * remove elements.
+        *
+        * @return A read-only iterator over all the mappings in the state
+        *
+        * @throws Exception Thrown if the system cannot access the state.
+        */
+       @Override
+       public Iterator<Map.Entry<K, V>> iterator() {
+               return Collections.unmodifiableSet(state.entrySet()).iterator();
+       }
+
+       @Override
+       public void clear() {
+               throw MODIFICATION_ATTEMPT_ERROR;
+       }
+
+       public static <K, V> ImmutableMapState<K, V> createState(
+                       final MapStateDescriptor<K, V> stateDescriptor,
+                       final byte[] serializedState) throws IOException {
+
+               final Map<K, V> state = KvStateSerializer.deserializeMap(
+                               serializedState,
+                               stateDescriptor.getKeySerializer(),
+                               stateDescriptor.getValueSerializer());
+               return new ImmutableMapState<>(state);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java
new file mode 100644
index 0000000..da08c53
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * A read-only {@link ReducingState} that does not allow for modifications.
+ *
+ * <p>This is the result returned when querying Flink's keyed state using the
+ * {@link org.apache.flink.queryablestate.client.QueryableStateClient 
Queryable State Client} and
+ * providing an {@link ReducingStateDescriptor}.
+ */
+@PublicEvolving
+public final class ImmutableReducingState<V> extends ImmutableState implements 
ReducingState<V> {
+
+       private final V value;
+
+       private ImmutableReducingState(V value) {
+               this.value = Preconditions.checkNotNull(value);
+       }
+
+       @Override
+       public V get() {
+               return value;
+       }
+
+       @Override
+       public void add(V newValue) {
+               throw MODIFICATION_ATTEMPT_ERROR;
+       }
+
+       @Override
+       public void clear() {
+               throw MODIFICATION_ATTEMPT_ERROR;
+       }
+
+       public static <V> ImmutableReducingState<V> createState(
+                       final ReducingStateDescriptor<V> stateDescriptor,
+                       final byte[] serializedState) throws IOException {
+
+               final V state = KvStateSerializer.deserializeValue(
+                               serializedState,
+                               stateDescriptor.getSerializer());
+               return new ImmutableReducingState<>(state);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableState.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableState.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableState.java
new file mode 100644
index 0000000..863f07b
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableState.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.state;
+
+/**
+ * A base class for the <b>read-only</b> types of state returned
+ * as results from the Queryable State Client.
+ */
+abstract class ImmutableState {
+
+       protected static final UnsupportedOperationException 
MODIFICATION_ATTEMPT_ERROR =
+                       new UnsupportedOperationException("State is read-only. 
No modifications allowed.");
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableStateBinder.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableStateBinder.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableStateBinder.java
new file mode 100644
index 0000000..6ce2787
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableStateBinder.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.state;
+
+import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.StateBinder;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A {@link StateBinder} used to deserialize the results returned by the
+ * {@link org.apache.flink.queryablestate.client.QueryableStateClient}.
+ *
+ * <p>The result is an immutable {@link 
org.apache.flink.api.common.state.State State}
+ * object containing the requested result.
+ */
+public class ImmutableStateBinder implements StateBinder {
+
+       private final byte[] serializedState;
+
+       public ImmutableStateBinder(final byte[] content) {
+               serializedState = Preconditions.checkNotNull(content);
+       }
+
+       @Override
+       public <T> ValueState<T> createValueState(ValueStateDescriptor<T> 
stateDesc) throws Exception {
+               return ImmutableValueState.createState(stateDesc, 
serializedState);
+       }
+
+       @Override
+       public <T> ListState<T> createListState(ListStateDescriptor<T> 
stateDesc) throws Exception {
+               return ImmutableListState.createState(stateDesc, 
serializedState);
+       }
+
+       @Override
+       public <T> ReducingState<T> 
createReducingState(ReducingStateDescriptor<T> stateDesc) throws Exception {
+               return ImmutableReducingState.createState(stateDesc, 
serializedState);
+       }
+
+       @Override
+       public <IN, ACC, OUT> AggregatingState<IN, OUT> 
createAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateDesc) 
throws Exception {
+               return ImmutableAggregatingState.createState(stateDesc, 
serializedState);
+       }
+
+       @Override
+       public <T, ACC> FoldingState<T, ACC> 
createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
+               return ImmutableFoldingState.createState(stateDesc, 
serializedState);
+       }
+
+       @Override
+       public <MK, MV> MapState<MK, MV> createMapState(MapStateDescriptor<MK, 
MV> stateDesc) throws Exception {
+               return ImmutableMapState.createState(stateDesc, 
serializedState);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java
new file mode 100644
index 0000000..7fd6457
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * A read-only {@link ValueState} that does not allow for modifications.
+ *
+ * <p>This is the result returned when querying Flink's keyed state using the
+ * {@link org.apache.flink.queryablestate.client.QueryableStateClient 
Queryable State Client} and
+ * providing an {@link ValueStateDescriptor}.
+ */
+@PublicEvolving
+public final class ImmutableValueState<V> extends ImmutableState implements 
ValueState<V> {
+
+       private final V value;
+
+       private ImmutableValueState(V value) {
+               this.value = Preconditions.checkNotNull(value);
+       }
+
+       @Override
+       public V value() {
+               return value;
+       }
+
+       @Override
+       public void update(V newValue) {
+               throw MODIFICATION_ATTEMPT_ERROR;
+       }
+
+       @Override
+       public void clear() {
+               throw MODIFICATION_ATTEMPT_ERROR;
+       }
+
+       public static <V> ImmutableValueState<V> createState(
+                       final ValueStateDescriptor<V> stateDescriptor,
+                       final byte[] serializedState) throws IOException {
+
+               final V state = KvStateSerializer.deserializeValue(
+                               serializedState,
+                               stateDescriptor.getSerializer());
+               return new ImmutableValueState<>(state);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
index b9bf671..18a88da 100644
--- 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
@@ -121,7 +121,7 @@ public abstract class AbstractServerHandler<REQ extends 
MessageBody, RESP extend
                                // Execute actual query async, because it is 
possibly
                                // blocking (e.g. file I/O).
                                //
-                               // A submission failure is not treated as 
fatal. todo here if there is a shared resource e.g. registry, then I will have 
to sync on that.
+                               // A submission failure is not treated as fatal.
                                queryExecutor.submit(new 
AsyncRequestTask<>(this, ctx, requestId, request, stats));
 
                        } else {

http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateITCase.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateITCase.java
index a096f55..69316fa 100644
--- 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateITCase.java
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateITCase.java
@@ -19,17 +19,27 @@
 package org.apache.flink.queryablestate.itcases;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingState;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -49,12 +59,18 @@ import 
org.apache.flink.runtime.minicluster.FlinkMiniCluster;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.VoidNamespaceTypeInfo;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.QueryableStateStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
@@ -63,13 +79,16 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ThreadLocalRandom;
@@ -198,7 +217,7 @@ public abstract class AbstractQueryableStateITCase extends 
TestLogger {
                        while (!allNonZero && deadline.hasTimeLeft()) {
                                allNonZero = true;
 
-                               final List<CompletableFuture<Tuple2<Integer, 
Long>>> futures = new ArrayList<>(numKeys);
+                               final 
List<CompletableFuture<ReducingState<Tuple2<Integer, Long>>>> futures = new 
ArrayList<>(numKeys);
 
                                for (int i = 0; i < numKeys; i++) {
                                        final int key = i;
@@ -210,7 +229,7 @@ public abstract class AbstractQueryableStateITCase extends 
TestLogger {
                                                allNonZero = false;
                                        }
 
-                                       CompletableFuture<Tuple2<Integer, 
Long>> result = getKvStateWithRetries(
+                                       
CompletableFuture<ReducingState<Tuple2<Integer, Long>>> result = 
getKvStateWithRetries(
                                                        client,
                                                        jobId,
                                                        queryName,
@@ -221,9 +240,14 @@ public abstract class AbstractQueryableStateITCase extends 
TestLogger {
                                                        false,
                                                        executor);
 
-                                       result.thenAccept(res -> {
-                                               counts.set(key, res.f1);
-                                               assertEquals("Key mismatch", 
key, res.f0.intValue());
+                                       result.thenAccept(response -> {
+                                               try {
+                                                       Tuple2<Integer, Long> 
res = response.get();
+                                                       counts.set(key, res.f1);
+                                                       assertEquals("Key 
mismatch", key, res.f0.intValue());
+                                               } catch (Exception e) {
+                                                       
Assert.fail(e.getMessage());
+                                               }
                                        });
 
                                        futures.add(result);
@@ -406,7 +430,7 @@ public abstract class AbstractQueryableStateITCase extends 
TestLogger {
 
                        cluster.submitJobDetached(jobGraph);
 
-                       executeQuery(deadline, client, jobId, "hakuna", 
valueState, numElements);
+                       executeValueQuery(deadline, client, jobId, "hakuna", 
valueState, numElements);
                } finally {
                        // Free cluster resources
                        if (jobId != null) {
@@ -485,7 +509,7 @@ public abstract class AbstractQueryableStateITCase extends 
TestLogger {
 
                        cluster.submitJobDetached(jobGraph);
 
-                       executeQuery(deadline, client, jobId, "hakuna", 
valueState, expected);
+                       executeValueQuery(deadline, client, jobId, "hakuna", 
valueState, expected);
                } finally {
                        // Free cluster resources
                        if (jobId != null) {
@@ -502,87 +526,6 @@ public abstract class AbstractQueryableStateITCase extends 
TestLogger {
        }
 
        /**
-        * Retry a query for state for keys between 0 and {@link 
#maxParallelism} until
-        * <tt>expected</tt> equals the value of the result tuple's second 
field.
-        */
-       private void executeQuery(
-                       final Deadline deadline,
-                       final QueryableStateClient client,
-                       final JobID jobId,
-                       final String queryableStateName,
-                       final StateDescriptor<?, Tuple2<Integer, Long>> 
stateDescriptor,
-                       final long expected) throws Exception {
-
-               for (int key = 0; key < maxParallelism; key++) {
-                       boolean success = false;
-                       while (deadline.hasTimeLeft() && !success) {
-                               CompletableFuture<Tuple2<Integer, Long>> future 
= getKvStateWithRetries(
-                                               client,
-                                               jobId,
-                                               queryableStateName,
-                                               key,
-                                               BasicTypeInfo.INT_TYPE_INFO,
-                                               stateDescriptor,
-                                               QUERY_RETRY_DELAY,
-                                               false,
-                                               executor);
-
-                               Tuple2<Integer, Long> value = 
future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-
-                               assertEquals("Key mismatch", key, 
value.f0.intValue());
-                               if (expected == value.f1) {
-                                       success = true;
-                               } else {
-                                       // Retry
-                                       Thread.sleep(50L);
-                               }
-                       }
-
-                       assertTrue("Did not succeed query", success);
-               }
-       }
-
-       /**
-        * Retry a query for state for keys between 0 and {@link 
#maxParallelism} until
-        * <tt>expected</tt> equals the value of the result tuple's second 
field.
-        */
-       private void executeQuery(
-                       final Deadline deadline,
-                       final QueryableStateClient client,
-                       final JobID jobId,
-                       final String queryableStateName,
-                       final TypeSerializer<Tuple2<Integer, Long>> 
valueSerializer,
-                       final long expected) throws Exception {
-
-               for (int key = 0; key < maxParallelism; key++) {
-                       boolean success = false;
-                       while (deadline.hasTimeLeft() && !success) {
-                               Future<Tuple2<Integer, Long>> future = 
getKvStateWithRetries(client,
-                                               jobId,
-                                               queryableStateName,
-                                               key,
-                                               BasicTypeInfo.INT_TYPE_INFO,
-                                               valueSerializer,
-                                               QUERY_RETRY_DELAY,
-                                               false,
-                                               executor);
-
-                               Tuple2<Integer, Long> value = 
future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-
-                               assertEquals("Key mismatch", key, 
value.f0.intValue());
-                               if (expected == value.f1) {
-                                       success = true;
-                               } else {
-                                       // Retry
-                                       Thread.sleep(50L);
-                               }
-                       }
-
-                       assertTrue("Did not succeed query", success);
-               }
-       }
-
-       /**
         * Tests simple value state queryable state instance with a default 
value
         * set. Each source emits (subtaskIndex, 0)..(subtaskIndex, numElements)
         * tuples, the key is mapped to 1 but key 0 is queried which should 
throw
@@ -647,7 +590,7 @@ public abstract class AbstractQueryableStateITCase extends 
TestLogger {
 
                        // Now query
                        int key = 0;
-                       CompletableFuture<Tuple2<Integer, Long>> future = 
getKvStateWithRetries(
+                       CompletableFuture<ValueState<Tuple2<Integer, Long>>> 
future = getKvStateWithRetries(
                                        client,
                                        jobId,
                                        queryableState.getQueryableStateName(),
@@ -730,8 +673,9 @@ public abstract class AbstractQueryableStateITCase extends 
TestLogger {
 
                        cluster.submitJobDetached(jobGraph);
 
-                       executeQuery(deadline, client, jobId, "matata",
-                                       queryableState.getValueSerializer(), 
numElements);
+                       final ValueStateDescriptor<Tuple2<Integer, Long>> 
stateDesc =
+                                       (ValueStateDescriptor<Tuple2<Integer, 
Long>>) queryableState.getStateDescriptor();
+                       executeValueQuery(deadline, client, jobId, "matata", 
stateDesc, numElements);
                } finally {
 
                        // Free cluster resources
@@ -809,10 +753,10 @@ public abstract class AbstractQueryableStateITCase 
extends TestLogger {
                        for (int key = 0; key < maxParallelism; key++) {
                                boolean success = false;
                                while (deadline.hasTimeLeft() && !success) {
-                                       CompletableFuture<String> future = 
getKvStateWithRetries(
+                                       
CompletableFuture<FoldingState<Tuple2<Integer, Long>, String>> future = 
getKvStateWithRetries(
                                                        client,
                                                        jobId,
-                                                       
queryableState.getQueryableStateName(),
+                                                       "pumba",
                                                        key,
                                                        
BasicTypeInfo.INT_TYPE_INFO,
                                                        foldingState,
@@ -820,7 +764,9 @@ public abstract class AbstractQueryableStateITCase extends 
TestLogger {
                                                        false,
                                                        executor);
 
-                                       String value = 
future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+                                       String value = 
future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).get();
+
+                                       //assertEquals("Key mismatch", key, 
value.f0.intValue());
                                        if (expected.equals(value)) {
                                                success = true;
                                        } else {
@@ -898,12 +844,150 @@ public abstract class AbstractQueryableStateITCase 
extends TestLogger {
 
                        cluster.submitJobDetached(jobGraph);
 
-                       // Wait until job is running
+                       // Now query
+                       long expected = numElements * (numElements + 1L) / 2L;
+
+                       for (int key = 0; key < maxParallelism; key++) {
+                               boolean success = false;
+                               while (deadline.hasTimeLeft() && !success) {
+                                       
CompletableFuture<ReducingState<Tuple2<Integer, Long>>> future = 
getKvStateWithRetries(
+                                                       client,
+                                                       jobId,
+                                                       "jungle",
+                                                       key,
+                                                       
BasicTypeInfo.INT_TYPE_INFO,
+                                                       reducingState,
+                                                       QUERY_RETRY_DELAY,
+                                                       false,
+                                                       executor);
+
+                                       Tuple2<Integer, Long> value = 
future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).get();
+
+                                       assertEquals("Key mismatch", key, 
value.f0.intValue());
+                                       if (expected == value.f1) {
+                                               success = true;
+                                       } else {
+                                               // Retry
+                                               Thread.sleep(50L);
+                                       }
+                               }
+
+                               assertTrue("Did not succeed query", success);
+                       }
+               } finally {
+                       // Free cluster resources
+                       if (jobId != null) {
+                               CompletableFuture<CancellationSuccess> 
cancellation = FutureUtils.toJava(cluster
+                                               
.getLeaderGateway(deadline.timeLeft())
+                                               .ask(new 
JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+                                               
.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
+
+                               
cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+                       }
+
+                       client.shutdown();
+               }
+       }
+
+       /**
+        * Tests simple map state queryable state instance. Each source emits
+        * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then
+        * queried. The map state instance sums the values up. The test succeeds
+        * after each subtask index is queried with result n*(n+1)/2.
+        */
+       @Test
+       public void testMapState() throws Exception {
+               // Config
+               final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+               final long numElements = 1024L;
+
+               final QueryableStateClient client = new QueryableStateClient(
+                               "localhost",
+                               
Integer.parseInt(QueryableStateOptions.PROXY_PORT_RANGE.defaultValue()));
+
+               JobID jobId = null;
+               try {
+                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+                       env.setStateBackend(stateBackend);
+                       env.setParallelism(maxParallelism);
+                       // Very important, because cluster is shared between 
tests and we
+                       // don't explicitly check that all slots are available 
before
+                       // submitting.
+                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
1000L));
+
+                       DataStream<Tuple2<Integer, Long>> source = env
+                                       .addSource(new 
TestAscendingValueSource(numElements));
+
+                       final MapStateDescriptor<Integer, Tuple2<Integer, 
Long>> mapStateDescriptor = new MapStateDescriptor<>(
+                                       "timon",
+                                       BasicTypeInfo.INT_TYPE_INFO,
+                                       source.getType());
+                       mapStateDescriptor.setQueryable("timon-queryable");
+
+                       source.keyBy(new KeySelector<Tuple2<Integer, Long>, 
Integer>() {
+                               private static final long serialVersionUID = 
8470749712274833552L;
+
+                               @Override
+                               public Integer getKey(Tuple2<Integer, Long> 
value) throws Exception {
+                                       return value.f0;
+                               }
+                       }).process(new ProcessFunction<Tuple2<Integer, Long>, 
Object>() {
+                               private static final long serialVersionUID = 
-805125545438296619L;
+
+                               private transient MapState<Integer, 
Tuple2<Integer, Long>> mapState;
+
+                               @Override
+                               public void open(Configuration parameters) 
throws Exception {
+                                       super.open(parameters);
+                                       mapState = 
getRuntimeContext().getMapState(mapStateDescriptor);
+                               }
+
+                               @Override
+                               public void processElement(Tuple2<Integer, 
Long> value, Context ctx, Collector<Object> out) throws Exception {
+                                       Tuple2<Integer, Long> v = 
mapState.get(value.f0);
+                                       if (v == null) {
+                                               v = new Tuple2<>(value.f0, 0L);
+                                       }
+                                       mapState.put(value.f0, new 
Tuple2<>(v.f0, v.f1 + value.f1));
+                               }
+                       });
+
+                       // Submit the job graph
+                       JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+                       jobId = jobGraph.getJobID();
+
+                       cluster.submitJobDetached(jobGraph);
 
                        // Now query
                        long expected = numElements * (numElements + 1L) / 2L;
 
-                       executeQuery(deadline, client, jobId, "jungle", 
reducingState, expected);
+                       for (int key = 0; key < maxParallelism; key++) {
+                               boolean success = false;
+                               while (deadline.hasTimeLeft() && !success) {
+                                       CompletableFuture<MapState<Integer, 
Tuple2<Integer, Long>>> future = getKvStateWithRetries(
+                                                       client,
+                                                       jobId,
+                                                       "timon-queryable",
+                                                       key,
+                                                       
BasicTypeInfo.INT_TYPE_INFO,
+                                                       mapStateDescriptor,
+                                                       QUERY_RETRY_DELAY,
+                                                       false,
+                                                       executor);
+
+                                       Tuple2<Integer, Long> value = 
future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).get(key);
+                                       assertEquals("Key mismatch", key, 
value.f0.intValue());
+                                       if (expected == value.f1) {
+                                               success = true;
+                                       } else {
+                                               // Retry
+                                               Thread.sleep(50L);
+                                       }
+                               }
+
+                               assertTrue("Did not succeed query", success);
+                       }
                } finally {
                        // Free cluster resources
                        if (jobId != null) {
@@ -920,6 +1004,227 @@ public abstract class AbstractQueryableStateITCase 
extends TestLogger {
        }
 
        /**
+        * Tests simple list state queryable state instance. Each source emits
+        * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then
+        * queried. The list state instance add the values to the list. The test
+        * succeeds after each subtask index is queried and the list contains
+        * the correct number of distinct elements.
+        */
+       @Test
+       public void testListState() throws Exception {
+               // Config
+               final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+               final long numElements = 1024L;
+
+               final QueryableStateClient client = new QueryableStateClient(
+                               "localhost",
+                               
Integer.parseInt(QueryableStateOptions.PROXY_PORT_RANGE.defaultValue()));
+
+               JobID jobId = null;
+               try {
+                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+                       env.setStateBackend(stateBackend);
+                       env.setParallelism(maxParallelism);
+                       // Very important, because cluster is shared between 
tests and we
+                       // don't explicitly check that all slots are available 
before
+                       // submitting.
+                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
1000L));
+
+                       DataStream<Tuple2<Integer, Long>> source = env
+                                       .addSource(new 
TestAscendingValueSource(numElements));
+
+                       final ListStateDescriptor<Long> listStateDescriptor = 
new ListStateDescriptor<Long>(
+                                       "list",
+                                       BasicTypeInfo.LONG_TYPE_INFO);
+                       listStateDescriptor.setQueryable("list-queryable");
+
+                       source.keyBy(new KeySelector<Tuple2<Integer, Long>, 
Integer>() {
+                               private static final long serialVersionUID = 
8470749712274833552L;
+
+                               @Override
+                               public Integer getKey(Tuple2<Integer, Long> 
value) throws Exception {
+                                       return value.f0;
+                               }
+                       }).process(new ProcessFunction<Tuple2<Integer, Long>, 
Object>() {
+                               private static final long serialVersionUID = 
-805125545438296619L;
+
+                               private transient ListState<Long> listState;
+
+                               @Override
+                               public void open(Configuration parameters) 
throws Exception {
+                                       super.open(parameters);
+                                       listState = 
getRuntimeContext().getListState(listStateDescriptor);
+                               }
+
+                               @Override
+                               public void processElement(Tuple2<Integer, 
Long> value, Context ctx, Collector<Object> out) throws Exception {
+                                       listState.add(value.f1);
+                               }
+                       });
+
+                       // Submit the job graph
+                       JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+                       jobId = jobGraph.getJobID();
+
+                       cluster.submitJobDetached(jobGraph);
+
+                       // Now query
+
+                       Map<Integer, Set<Long>> results = new HashMap<>();
+
+                       for (int key = 0; key < maxParallelism; key++) {
+                               boolean success = false;
+                               while (deadline.hasTimeLeft() && !success) {
+                                       CompletableFuture<ListState<Long>> 
future = getKvStateWithRetries(
+                                                       client,
+                                                       jobId,
+                                                       "list-queryable",
+                                                       key,
+                                                       
BasicTypeInfo.INT_TYPE_INFO,
+                                                       listStateDescriptor,
+                                                       QUERY_RETRY_DELAY,
+                                                       false,
+                                                       executor);
+
+                                       Iterable<Long> value = 
future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).get();
+
+                                       Set<Long> res = new HashSet<>();
+                                       for (Long v: value) {
+                                               res.add(v);
+                                       }
+
+                                       // the source starts at 0, so +1
+                                       if (res.size() == numElements + 1L) {
+                                               success = true;
+                                               results.put(key, res);
+                                       } else {
+                                               // Retry
+                                               Thread.sleep(50L);
+                                       }
+                               }
+
+                               assertTrue("Did not succeed query", success);
+                       }
+
+                       for (int key = 0; key < maxParallelism; key++) {
+                               Set<Long> values = results.get(key);
+                               for (long i = 0L; i <= numElements; i++) {
+                                       assertTrue(values.contains(i));
+                               }
+                       }
+
+               } finally {
+                       // Free cluster resources
+                       if (jobId != null) {
+                               CompletableFuture<CancellationSuccess> 
cancellation = FutureUtils.toJava(cluster
+                                               
.getLeaderGateway(deadline.timeLeft())
+                                               .ask(new 
JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+                                               
.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
+
+                               
cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+                       }
+
+                       client.shutdown();
+               }
+       }
+
+       @Test
+       public void testAggregatingState() throws Exception {
+               // Config
+               final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+               final long numElements = 1024L;
+
+               final QueryableStateClient client = new QueryableStateClient(
+                               "localhost",
+                               
Integer.parseInt(QueryableStateOptions.PROXY_PORT_RANGE.defaultValue()));
+
+               JobID jobId = null;
+               try {
+                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+                       env.setStateBackend(stateBackend);
+                       env.setParallelism(maxParallelism);
+                       // Very important, because cluster is shared between 
tests and we
+                       // don't explicitly check that all slots are available 
before
+                       // submitting.
+                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
1000L));
+
+                       DataStream<Tuple2<Integer, Long>> source = env
+                                       .addSource(new 
TestAscendingValueSource(numElements));
+
+                       final AggregatingStateDescriptor<Tuple2<Integer, Long>, 
MutableString, String> aggrStateDescriptor =
+                                       new AggregatingStateDescriptor<>(
+                                                       "aggregates",
+                                                       new SumAggr(),
+                                                       MutableString.class);
+                       aggrStateDescriptor.setQueryable("aggr-queryable");
+
+                       source.keyBy(new KeySelector<Tuple2<Integer, Long>, 
Integer>() {
+                               private static final long serialVersionUID = 
8470749712274833552L;
+
+                               @Override
+                               public Integer getKey(Tuple2<Integer, Long> 
value) throws Exception {
+                                       return value.f0;
+                               }
+                       }).transform(
+                                       "TestAggregatingOperator",
+                                       BasicTypeInfo.STRING_TYPE_INFO,
+                                       new 
AggregatingTestOperator(aggrStateDescriptor)
+                       );
+
+                       // Submit the job graph
+                       JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+                       jobId = jobGraph.getJobID();
+
+                       cluster.submitJobDetached(jobGraph);
+
+                       // Now query
+
+                       for (int key = 0; key < maxParallelism; key++) {
+                               boolean success = false;
+                               while (deadline.hasTimeLeft() && !success) {
+                                       
CompletableFuture<AggregatingState<Tuple2<Integer, Long>, String>> future = 
getKvStateWithRetries(
+                                                       client,
+                                                       jobId,
+                                                       "aggr-queryable",
+                                                       key,
+                                                       
BasicTypeInfo.INT_TYPE_INFO,
+                                                       aggrStateDescriptor,
+                                                       QUERY_RETRY_DELAY,
+                                                       false,
+                                                       executor);
+
+                                       String value = 
future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).get();
+
+                                       if (Long.parseLong(value) == 
numElements * (numElements + 1L) / 2L) {
+                                               success = true;
+                                       } else {
+                                               // Retry
+                                               Thread.sleep(50L);
+                                       }
+                               }
+
+                               assertTrue("Did not succeed query", success);
+                       }
+               } finally {
+                       // Free cluster resources
+                       if (jobId != null) {
+                               CompletableFuture<CancellationSuccess> 
cancellation = FutureUtils.toJava(cluster
+                                               
.getLeaderGateway(deadline.timeLeft())
+                                               .ask(new 
JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+                                               
.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
+
+                               
cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+                       }
+
+                       client.shutdown();
+               }
+       }
+
+       /////                           Sources/UDFs Used in the Tests          
        //////
+
+       /**
         * Test source producing (key, 0)..(key, maxValue) with key being the 
sub
         * task index.
         *
@@ -980,8 +1285,8 @@ public abstract class AbstractQueryableStateITCase extends 
TestLogger {
        /**
         * Test source producing (key, 1) tuples with random key in key range 
(numKeys).
         */
-       protected static class TestKeyRangeSource extends 
RichParallelSourceFunction<Tuple2<Integer, Long>>
-                       implements CheckpointListener {
+       private static class TestKeyRangeSource extends 
RichParallelSourceFunction<Tuple2<Integer, Long>> implements CheckpointListener 
{
+
                private static final long serialVersionUID = 
-5744725196953582710L;
 
                private static final AtomicLong LATEST_CHECKPOINT_ID = new 
AtomicLong();
@@ -997,7 +1302,7 @@ public abstract class AbstractQueryableStateITCase extends 
TestLogger {
                public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
-                               LATEST_CHECKPOINT_ID.set(0);
+                               LATEST_CHECKPOINT_ID.set(0L);
                        }
                }
 
@@ -1012,7 +1317,7 @@ public abstract class AbstractQueryableStateITCase 
extends TestLogger {
                                        ctx.collect(record);
                                }
                                // mild slow down
-                               Thread.sleep(1);
+                               Thread.sleep(1L);
                        }
                }
 
@@ -1030,6 +1335,77 @@ public abstract class AbstractQueryableStateITCase 
extends TestLogger {
        }
 
        /**
+        * An operator that uses {@link AggregatingState}.
+        *
+        * <p>The operator exists for lack of possibility to get an
+        * {@link AggregatingState} from the {@link 
org.apache.flink.api.common.functions.RuntimeContext}.
+        * If this were not the case, we could have a {@link ProcessFunction}.
+        */
+       private static class AggregatingTestOperator
+                       extends AbstractStreamOperator<String>
+                       implements OneInputStreamOperator<Tuple2<Integer, 
Long>, String> {
+
+               private static final long serialVersionUID = 1L;
+
+               private final AggregatingStateDescriptor<Tuple2<Integer, Long>, 
MutableString, String> stateDescriptor;
+               private transient AggregatingState<Tuple2<Integer, Long>, 
String> state;
+
+               
AggregatingTestOperator(AggregatingStateDescriptor<Tuple2<Integer, Long>, 
MutableString, String> stateDesc) {
+                       this.stateDescriptor = stateDesc;
+               }
+
+               @Override
+               public void open() throws Exception {
+                       super.open();
+                       this.state = getKeyedStateBackend().getPartitionedState(
+                                       VoidNamespace.INSTANCE,
+                                       VoidNamespaceSerializer.INSTANCE,
+                                       stateDescriptor);
+               }
+
+               @Override
+               public void processElement(StreamRecord<Tuple2<Integer, Long>> 
element) throws Exception {
+                       state.add(element.getValue());
+               }
+       }
+
+       /**
+        * Test {@link AggregateFunction} concatenating the already stored 
string with the long passed as argument.
+        */
+       private static class SumAggr implements 
AggregateFunction<Tuple2<Integer, Long>, MutableString, String> {
+
+               private static final long serialVersionUID = 
-6249227626701264599L;
+
+               @Override
+               public MutableString createAccumulator() {
+                       return new MutableString();
+               }
+
+               @Override
+               public void add(Tuple2<Integer, Long> value, MutableString 
accumulator) {
+                       long acc = Long.valueOf(accumulator.value);
+                       acc += value.f1;
+                       accumulator.value = Long.toString(acc);
+               }
+
+               @Override
+               public String getResult(MutableString accumulator) {
+                       return accumulator.value;
+               }
+
+               @Override
+               public MutableString merge(MutableString a, MutableString b) {
+                       MutableString nValue = new MutableString();
+                       nValue.value = Long.toString(Long.valueOf(a.value) + 
Long.valueOf(b.value));
+                       return nValue;
+               }
+       }
+
+       private static final class MutableString {
+               String value = "0";
+       }
+
+       /**
         * Test {@link FoldFunction} concatenating the already stored string 
with the long passed as argument.
         */
        private static class SumFold implements FoldFunction<Tuple2<Integer, 
Long>, String> {
@@ -1058,32 +1434,13 @@ public abstract class AbstractQueryableStateITCase 
extends TestLogger {
 
        /////                           General Utility Methods                 
        //////
 
-       private static <K, V> Future<V> getKvStateWithRetries(
-                       final QueryableStateClient client,
-                       final JobID jobId,
-                       final String queryName,
-                       final K key,
-                       final TypeInformation<K> keyTypeInfo,
-                       final TypeSerializer<V> valueTypeSerializer,
-                       final Time retryDelay,
-                       final boolean failForUnknownKeyOrNamespace,
-                       final ScheduledExecutor executor) {
-
-               return retryWithDelay(
-                               () -> client.getKvState(jobId, queryName, key, 
VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, 
valueTypeSerializer),
-                               NO_OF_RETRIES,
-                               retryDelay,
-                               executor,
-                               failForUnknownKeyOrNamespace);
-       }
-
-       private static <K, V> CompletableFuture<V> getKvStateWithRetries(
+       private static <K, S extends State, V> CompletableFuture<S> 
getKvStateWithRetries(
                        final QueryableStateClient client,
                        final JobID jobId,
                        final String queryName,
                        final K key,
                        final TypeInformation<K> keyTypeInfo,
-                       final StateDescriptor<?, V> stateDescriptor,
+                       final StateDescriptor<S, V> stateDescriptor,
                        final Time retryDelay,
                        final boolean failForUnknownKeyOrNamespace,
                        final ScheduledExecutor executor) {
@@ -1157,4 +1514,45 @@ public abstract class AbstractQueryableStateITCase 
extends TestLogger {
                                        (t, throwable) -> 
operationResultFuture.cancel(false));
                }
        }
+
+       /**
+        * Retry a query for state for keys between 0 and {@link 
#maxParallelism} until
+        * <tt>expected</tt> equals the value of the result tuple's second 
field.
+        */
+       private void executeValueQuery(
+                       final Deadline deadline,
+                       final QueryableStateClient client,
+                       final JobID jobId,
+                       final String queryableStateName,
+                       final ValueStateDescriptor<Tuple2<Integer, Long>> 
stateDescriptor,
+                       final long expected) throws Exception {
+
+               for (int key = 0; key < maxParallelism; key++) {
+                       boolean success = false;
+                       while (deadline.hasTimeLeft() && !success) {
+                               CompletableFuture<ValueState<Tuple2<Integer, 
Long>>> future = getKvStateWithRetries(
+                                               client,
+                                               jobId,
+                                               queryableStateName,
+                                               key,
+                                               BasicTypeInfo.INT_TYPE_INFO,
+                                               stateDescriptor,
+                                               QUERY_RETRY_DELAY,
+                                               false,
+                                               executor);
+
+                               Tuple2<Integer, Long> value = 
future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).value();
+
+                               assertEquals("Key mismatch", key, 
value.f0.intValue());
+                               if (expected == value.f1) {
+                                       success = true;
+                               } else {
+                                       // Retry
+                                       Thread.sleep(50L);
+                               }
+                       }
+
+                       assertTrue("Did not succeed query", success);
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableAggregatingStateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableAggregatingStateTest.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableAggregatingStateTest.java
new file mode 100644
index 0000000..69b2f61
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableAggregatingStateTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.queryablestate.client.state.ImmutableAggregatingState;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the {@link ImmutableAggregatingStateTest}.
+ */
+public class ImmutableAggregatingStateTest {
+
+       private final AggregatingStateDescriptor<Long, MutableString, String> 
aggrStateDesc =
+                       new AggregatingStateDescriptor<>(
+                                       "test",
+                                       new SumAggr(),
+                                       MutableString.class);
+
+       private ImmutableAggregatingState<Long, String> aggrState;
+
+       @Before
+       public void setUp() throws Exception {
+               if (!aggrStateDesc.isSerializerInitialized()) {
+                       aggrStateDesc.initializeSerializerUnlessSet(new 
ExecutionConfig());
+               }
+
+               final MutableString initValue = new MutableString();
+               initValue.value = "42";
+
+               ByteArrayOutputStream out = new ByteArrayOutputStream();
+               aggrStateDesc.getSerializer().serialize(initValue, new 
DataOutputViewStreamWrapper(out));
+
+               aggrState = ImmutableAggregatingState.createState(
+                               aggrStateDesc,
+                               out.toByteArray()
+               );
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testUpdate() {
+               String value = aggrState.get();
+               assertEquals("42", value);
+
+               aggrState.add(54L);
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testClear() {
+               String value = aggrState.get();
+               assertEquals("42", value);
+
+               aggrState.clear();
+       }
+
+       /**
+        * Test {@link AggregateFunction} concatenating the already stored 
string with the long passed as argument.
+        */
+       private static class SumAggr implements AggregateFunction<Long, 
MutableString, String> {
+
+               private static final long serialVersionUID = 
-6249227626701264599L;
+
+               @Override
+               public MutableString createAccumulator() {
+                       return new MutableString();
+               }
+
+               @Override
+               public void add(Long value, MutableString accumulator) {
+                       accumulator.value += ", " + value;
+               }
+
+               @Override
+               public String getResult(MutableString accumulator) {
+                       return accumulator.value;
+               }
+
+               @Override
+               public MutableString merge(MutableString a, MutableString b) {
+                       MutableString nValue = new MutableString();
+                       nValue.value = a.value + ", " + b.value;
+                       return nValue;
+               }
+       }
+
+       private static final class MutableString {
+               String value;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableFoldingStateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableFoldingStateTest.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableFoldingStateTest.java
new file mode 100644
index 0000000..d2c9535
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableFoldingStateTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.queryablestate.client.state.ImmutableFoldingState;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the {@link ImmutableFoldingState}.
+ */
+public class ImmutableFoldingStateTest {
+
+       private final FoldingStateDescriptor<Long, String> foldingStateDesc =
+                       new FoldingStateDescriptor<>(
+                                       "test",
+                                       "0",
+                                       new SumFold(),
+                                       StringSerializer.INSTANCE);
+
+       private ImmutableFoldingState<Long, String> foldingState;
+
+       @Before
+       public void setUp() throws Exception {
+               if (!foldingStateDesc.isSerializerInitialized()) {
+                       foldingStateDesc.initializeSerializerUnlessSet(new 
ExecutionConfig());
+               }
+
+               ByteArrayOutputStream out = new ByteArrayOutputStream();
+               StringSerializer.INSTANCE.serialize("42", new 
DataOutputViewStreamWrapper(out));
+
+               foldingState = ImmutableFoldingState.createState(
+                               foldingStateDesc,
+                               out.toByteArray()
+               );
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testUpdate() {
+               String value = foldingState.get();
+               assertEquals("42", value);
+
+               foldingState.add(54L);
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testClear() {
+               String value = foldingState.get();
+               assertEquals("42", value);
+
+               foldingState.clear();
+       }
+
+       /**
+        * Test {@link FoldFunction} concatenating the already stored string 
with the long passed as argument.
+        */
+       private static class SumFold implements FoldFunction<Long, String> {
+
+               private static final long serialVersionUID = 
-6249227626701264599L;
+
+               @Override
+               public String fold(String accumulator, Long value) throws 
Exception {
+                       long acc = Long.valueOf(accumulator);
+                       acc += value;
+                       return Long.toString(acc);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableListStateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableListStateTest.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableListStateTest.java
new file mode 100644
index 0000000..3283295
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableListStateTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.queryablestate.client.state.ImmutableListState;
+import org.apache.flink.runtime.state.heap.HeapListState;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the {@link ImmutableListState}.
+ */
+public class ImmutableListStateTest {
+
+       private final ListStateDescriptor<Long> listStateDesc =
+                       new ListStateDescriptor<>("test", 
BasicTypeInfo.LONG_TYPE_INFO);
+
+       private ImmutableListState<Long> listState;
+
+       @Before
+       public void setUp() throws Exception {
+               if (!listStateDesc.isSerializerInitialized()) {
+                       listStateDesc.initializeSerializerUnlessSet(new 
ExecutionConfig());
+               }
+
+               List<Long> init = new ArrayList<>();
+               init.add(42L);
+
+               byte[] serInit = serializeInitValue(init);
+               listState = ImmutableListState.createState(listStateDesc, 
serInit);
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testUpdate() {
+               List<Long> list = getStateContents();
+               assertEquals(1L, list.size());
+
+               long element = list.get(0);
+               assertEquals(42L, element);
+
+               listState.add(54L);
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testClear() {
+               List<Long> list = getStateContents();
+               assertEquals(1L, list.size());
+
+               long element = list.get(0);
+               assertEquals(42L, element);
+
+               listState.clear();
+       }
+
+       /**
+        * Copied from {@link HeapListState#getSerializedValue(Object, Object)}.
+        */
+       private byte[] serializeInitValue(List<Long> toSerialize) throws 
IOException {
+               TypeSerializer<Long> serializer = 
listStateDesc.getElementSerializer();
+
+               ByteArrayOutputStream baos = new ByteArrayOutputStream();
+               DataOutputViewStreamWrapper view = new 
DataOutputViewStreamWrapper(baos);
+
+               // write the same as RocksDB writes lists, with one ',' 
separator
+               for (int i = 0; i < toSerialize.size(); i++) {
+                       serializer.serialize(toSerialize.get(i), view);
+                       if (i < toSerialize.size() - 1) {
+                               view.writeByte(',');
+                       }
+               }
+               view.flush();
+
+               return baos.toByteArray();
+       }
+
+       private List<Long> getStateContents() {
+               List<Long> list = new ArrayList<>();
+               for (Long elem: listState.get()) {
+                       list.add(elem);
+               }
+               return list;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableMapStateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableMapStateTest.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableMapStateTest.java
new file mode 100644
index 0000000..30a8a50
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableMapStateTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.queryablestate.client.state.ImmutableMapState;
+import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests the {@link ImmutableMapState}.
+ */
+public class ImmutableMapStateTest {
+
+       private final MapStateDescriptor<Long, Long> mapStateDesc =
+                       new MapStateDescriptor<>(
+                                       "test",
+                                       BasicTypeInfo.LONG_TYPE_INFO,
+                                       BasicTypeInfo.LONG_TYPE_INFO);
+
+       private ImmutableMapState<Long, Long> mapState;
+
+       @Before
+       public void setUp() throws Exception {
+               if (!mapStateDesc.isSerializerInitialized()) {
+                       mapStateDesc.initializeSerializerUnlessSet(new 
ExecutionConfig());
+               }
+
+               Map<Long, Long> initMap = new HashMap<>();
+               initMap.put(1L, 5L);
+               initMap.put(2L, 5L);
+
+               byte[] initSer = KvStateSerializer.serializeMap(
+                               initMap.entrySet(),
+                               
BasicTypeInfo.LONG_TYPE_INFO.createSerializer(new ExecutionConfig()),
+                               
BasicTypeInfo.LONG_TYPE_INFO.createSerializer(new ExecutionConfig()));
+
+               mapState = ImmutableMapState.createState(mapStateDesc, initSer);
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testPut() {
+               assertTrue(mapState.contains(1L));
+               long value = mapState.get(1L);
+               assertEquals(5L, value);
+
+               assertTrue(mapState.contains(2L));
+               value = mapState.get(2L);
+               assertEquals(5L, value);
+
+               mapState.put(2L, 54L);
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testPutAll() {
+               assertTrue(mapState.contains(1L));
+               long value = mapState.get(1L);
+               assertEquals(5L, value);
+
+               assertTrue(mapState.contains(2L));
+               value = mapState.get(2L);
+               assertEquals(5L, value);
+
+               Map<Long, Long> nMap = new HashMap<>();
+               nMap.put(1L, 7L);
+               nMap.put(2L, 7L);
+
+               mapState.putAll(nMap);
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testUpdate() {
+               assertTrue(mapState.contains(1L));
+               long value = mapState.get(1L);
+               assertEquals(5L, value);
+
+               assertTrue(mapState.contains(2L));
+               value = mapState.get(2L);
+               assertEquals(5L, value);
+
+               mapState.put(2L, 54L);
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testIterator() {
+               assertTrue(mapState.contains(1L));
+               long value = mapState.get(1L);
+               assertEquals(5L, value);
+
+               assertTrue(mapState.contains(2L));
+               value = mapState.get(2L);
+               assertEquals(5L, value);
+
+               Iterator<Map.Entry<Long, Long>> iterator = mapState.iterator();
+               while (iterator.hasNext()) {
+                       iterator.remove();
+               }
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testIterable() {
+               assertTrue(mapState.contains(1L));
+               long value = mapState.get(1L);
+               assertEquals(5L, value);
+
+               assertTrue(mapState.contains(2L));
+               value = mapState.get(2L);
+               assertEquals(5L, value);
+
+               Iterable<Map.Entry<Long, Long>> iterable = mapState.entries();
+               Iterator<Map.Entry<Long, Long>> iterator = iterable.iterator();
+               while (iterator.hasNext()) {
+                       assertEquals(5L, (long) iterator.next().getValue());
+                       iterator.remove();
+               }
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testKeys() {
+               assertTrue(mapState.contains(1L));
+               long value = mapState.get(1L);
+               assertEquals(5L, value);
+
+               assertTrue(mapState.contains(2L));
+               value = mapState.get(2L);
+               assertEquals(5L, value);
+
+               Iterator<Long> iterator = mapState.keys().iterator();
+               while (iterator.hasNext()) {
+                       iterator.remove();
+               }
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testValues() {
+               assertTrue(mapState.contains(1L));
+               long value = mapState.get(1L);
+               assertEquals(5L, value);
+
+               assertTrue(mapState.contains(2L));
+               value = mapState.get(2L);
+               assertEquals(5L, value);
+
+               Iterator<Long> iterator = mapState.values().iterator();
+               while (iterator.hasNext()) {
+                       iterator.remove();
+               }
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testClear() {
+               assertTrue(mapState.contains(1L));
+               long value = mapState.get(1L);
+               assertEquals(5L, value);
+
+               assertTrue(mapState.contains(2L));
+               value = mapState.get(2L);
+               assertEquals(5L, value);
+
+               mapState.clear();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableReducingStateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableReducingStateTest.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableReducingStateTest.java
new file mode 100644
index 0000000..9b1ecf8
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableReducingStateTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.queryablestate.client.state.ImmutableReducingState;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the {@link ImmutableReducingState}.
+ */
+public class ImmutableReducingStateTest {
+
+       private final ReducingStateDescriptor<Long> reducingStateDesc =
+                       new ReducingStateDescriptor<>("test", new SumReduce(), 
BasicTypeInfo.LONG_TYPE_INFO);
+
+       private ImmutableReducingState<Long> reduceState;
+
+       @Before
+       public void setUp() throws Exception {
+               if (!reducingStateDesc.isSerializerInitialized()) {
+                       reducingStateDesc.initializeSerializerUnlessSet(new 
ExecutionConfig());
+               }
+
+               reduceState = ImmutableReducingState.createState(
+                               reducingStateDesc,
+                               
ByteBuffer.allocate(Long.BYTES).putLong(42L).array()
+               );
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testUpdate() {
+               long value = reduceState.get();
+               assertEquals(42L, value);
+
+               reduceState.add(54L);
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testClear() {
+               long value = reduceState.get();
+               assertEquals(42L, value);
+
+               reduceState.clear();
+       }
+
+       /**
+        * Test {@link ReduceFunction} summing up its two arguments.
+        */
+       private static class SumReduce implements ReduceFunction<Long> {
+
+               private static final long serialVersionUID = 
6041237513913189144L;
+
+               @Override
+               public Long reduce(Long value1, Long value2) throws Exception {
+                       return value1 + value2;
+               }
+       }
+}

Reply via email to