http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
new file mode 100644
index 0000000..cccaacb
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Map;
+
+/**
+ * Heap-backed partitioned {@link 
org.apache.flink.api.common.state.ValueState} that is snapshotted
+ * into files.
+ * 
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <V> The type of the value.
+ */
+public class HeapValueState<K, N, V>
+               extends AbstractHeapState<K, N, V, ValueState<V>, 
ValueStateDescriptor<V>>
+               implements ValueState<V> {
+
+       /**
+        * Creates a new key/value state for the given hash map of key/value 
pairs.
+        *
+        * @param backend The state backend backing that created this state.
+        * @param stateDesc The state identifier for the state. This contains 
name
+        *                           and can create a default state value.
+        * @param stateTable The state tab;e to use in this kev/value state. 
May contain initial state.
+        */
+       public HeapValueState(
+                       KeyedStateBackend<K> backend,
+                       ValueStateDescriptor<V> stateDesc,
+                       StateTable<K, N, V> stateTable,
+                       TypeSerializer<K> keySerializer,
+                       TypeSerializer<N> namespaceSerializer) {
+               super(backend, stateDesc, stateTable, keySerializer, 
namespaceSerializer);
+       }
+
+       @Override
+       public V value() {
+               Preconditions.checkState(currentNamespace != null, "No 
namespace set.");
+               Preconditions.checkState(backend.getCurrentKey() != null, "No 
key set.");
+
+               Map<N, Map<K, V>> namespaceMap =
+                               
stateTable.get(backend.getCurrentKeyGroupIndex());
+
+               if (namespaceMap == null) {
+                       return stateDesc.getDefaultValue();
+               }
+
+               Map<K, V> keyedMap = namespaceMap.get(currentNamespace);
+
+               if (keyedMap == null) {
+                       return stateDesc.getDefaultValue();
+               }
+
+               V result = keyedMap.get(backend.<K>getCurrentKey());
+
+               if (result == null) {
+                       return stateDesc.getDefaultValue();
+               }
+
+               return result;
+       }
+
+       @Override
+       public void update(V value) {
+               Preconditions.checkState(currentNamespace != null, "No 
namespace set.");
+               Preconditions.checkState(backend.getCurrentKey() != null, "No 
key set.");
+
+               if (value == null) {
+                       clear();
+                       return;
+               }
+
+               Map<N, Map<K, V>> namespaceMap =
+                               
stateTable.get(backend.getCurrentKeyGroupIndex());
+
+               if (namespaceMap == null) {
+                       namespaceMap = createNewMap();
+                       stateTable.set(backend.getCurrentKeyGroupIndex(), 
namespaceMap);
+               }
+
+               Map<K, V> keyedMap = namespaceMap.get(currentNamespace);
+
+               if (keyedMap == null) {
+                       keyedMap = createNewMap();
+                       namespaceMap.put(currentNamespace, keyedMap);
+               }
+
+               keyedMap.put(backend.<K>getCurrentKey(), value);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
new file mode 100644
index 0000000..96e23d6
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.KeyGroupRange;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class StateTable<K, N, ST> {
+
+       /** Serializer for the state value. The state value could be a List<V>, 
for example. */
+       protected final TypeSerializer<ST> stateSerializer;
+
+       /** The serializer for the namespace */
+       protected final TypeSerializer<N> namespaceSerializer;
+
+       /** Map for holding the actual state objects. */
+       private final List<Map<N, Map<K, ST>>> state;
+
+       protected final KeyGroupRange keyGroupRange;
+
+       public StateTable(
+                       TypeSerializer<ST> stateSerializer,
+                       TypeSerializer<N> namespaceSerializer,
+                       KeyGroupRange keyGroupRange) {
+               this.stateSerializer = stateSerializer;
+               this.namespaceSerializer = namespaceSerializer;
+               this.keyGroupRange = keyGroupRange;
+
+               this.state = Arrays.asList((Map<N, Map<K, ST>>[]) new 
Map[keyGroupRange.getNumberOfKeyGroups()]);
+       }
+
+       private int indexToOffset(int index) {
+               return index - keyGroupRange.getStartKeyGroup();
+       }
+
+       public Map<N, Map<K, ST>> get(int index) {
+               return keyGroupRange.contains(index) ? 
state.get(indexToOffset(index)) : null;
+       }
+
+       public void set(int index, Map<N, Map<K, ST>> map) {
+               if (!keyGroupRange.contains(index)) {
+                       throw new RuntimeException("Unexpected key group index. 
This indicates a bug.");
+               }
+               state.set(indexToOffset(index), map);
+       }
+
+       public TypeSerializer<ST> getStateSerializer() {
+               return stateSerializer;
+       }
+
+       public TypeSerializer<N> getNamespaceSerializer() {
+               return namespaceSerializer;
+       }
+
+       public List<Map<N, Map<K, ST>>> getState() {
+               return state;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemState.java
deleted file mode 100644
index cae673d..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemState.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.memory;
-
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.State;
-import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.state.AbstractHeapState;
-import org.apache.flink.runtime.state.KvStateSnapshot;
-import org.apache.flink.runtime.util.DataOutputSerializer;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Base class for partitioned {@link ListState} implementations that are 
backed by a regular
- * heap hash map. The concrete implementations define how the state is 
checkpointed.
- * 
- * @param <K> The type of the key.
- * @param <N> The type of the namespace.
- * @param <SV> The type of the values in the state.
- * @param <S> The type of State
- * @param <SD> The type of StateDescriptor for the State S
- */
-public abstract class AbstractMemState<K, N, SV, S extends State, SD extends 
StateDescriptor<S, ?>>
-               extends AbstractHeapState<K, N, SV, S, SD, MemoryStateBackend> {
-
-       public AbstractMemState(TypeSerializer<K> keySerializer,
-               TypeSerializer<N> namespaceSerializer,
-               TypeSerializer<SV> stateSerializer,
-               SD stateDesc) {
-               super(keySerializer, namespaceSerializer, stateSerializer, 
stateDesc);
-       }
-
-       public AbstractMemState(TypeSerializer<K> keySerializer,
-               TypeSerializer<N> namespaceSerializer,
-               TypeSerializer<SV> stateSerializer,
-               SD stateDesc,
-               HashMap<N, Map<K, SV>> state) {
-               super(keySerializer, namespaceSerializer, stateSerializer, 
stateDesc, state);
-       }
-
-       public abstract KvStateSnapshot<K, N, S, SD, MemoryStateBackend> 
createHeapSnapshot(byte[] bytes);
-
-       @Override
-       public KvStateSnapshot<K, N, S, SD, MemoryStateBackend> snapshot(long 
checkpointId, long timestamp) throws Exception {
-
-               DataOutputSerializer out = new 
DataOutputSerializer(Math.max(size() * 16, 16));
-
-               out.writeInt(state.size());
-               for (Map.Entry<N, Map<K, SV>> namespaceState: state.entrySet()) 
{
-                       N namespace = namespaceState.getKey();
-                       namespaceSerializer.serialize(namespace, out);
-                       out.writeInt(namespaceState.getValue().size());
-                       for (Map.Entry<K, SV> entry: 
namespaceState.getValue().entrySet()) {
-                               keySerializer.serialize(entry.getKey(), out);
-                               stateSerializer.serialize(entry.getValue(), 
out);
-                       }
-               }
-
-               byte[] bytes = out.getCopyOfBuffer();
-
-               return createHeapSnapshot(bytes);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemStateSnapshot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemStateSnapshot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemStateSnapshot.java
deleted file mode 100644
index e1b62d2..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemStateSnapshot.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.memory;
-
-import org.apache.flink.api.common.state.State;
-import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.state.KvState;
-import org.apache.flink.runtime.state.KvStateSnapshot;
-import org.apache.flink.runtime.util.DataInputDeserializer;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * A snapshot of a {@link MemValueState} for a checkpoint. The data is stored 
in a heap byte
- * array, in serialized form.
- * 
- * @param <K> The type of the key in the snapshot state.
- * @param <N> The type of the namespace in the snapshot state.
- * @param <SV> The type of the value in the snapshot state.
- */
-public abstract class AbstractMemStateSnapshot<K, N, SV, S extends State, SD 
extends StateDescriptor<S, ?>> 
-               implements KvStateSnapshot<K, N, S, SD, MemoryStateBackend> {
-
-       private static final long serialVersionUID = 1L;
-
-       /** Key Serializer */
-       protected final TypeSerializer<K> keySerializer;
-
-       /** Namespace Serializer */
-       protected final TypeSerializer<N> namespaceSerializer;
-
-       /** Serializer for the state value */
-       protected final TypeSerializer<SV> stateSerializer;
-
-       /** StateDescriptor, for sanity checks */
-       protected final SD stateDesc;
-
-       /** The serialized data of the state key/value pairs */
-       private final byte[] data;
-       
-       private transient boolean closed;
-
-       /**
-        * Creates a new heap memory state snapshot.
-        *
-        * @param keySerializer The serializer for the keys.
-        * @param namespaceSerializer The serializer for the namespace.
-        * @param stateSerializer The serializer for the elements in the state 
HashMap
-        * @param stateDesc The state identifier
-        * @param data The serialized data of the state key/value pairs
-        */
-       public AbstractMemStateSnapshot(TypeSerializer<K> keySerializer,
-               TypeSerializer<N> namespaceSerializer,
-               TypeSerializer<SV> stateSerializer,
-               SD stateDesc,
-               byte[] data) {
-               this.keySerializer = keySerializer;
-               this.namespaceSerializer = namespaceSerializer;
-               this.stateSerializer = stateSerializer;
-               this.stateDesc = stateDesc;
-               this.data = data;
-       }
-
-       public abstract KvState<K, N, S, SD, MemoryStateBackend> 
createMemState(HashMap<N, Map<K, SV>> stateMap);
-
-       @Override
-       public KvState<K, N, S, SD, MemoryStateBackend> restoreState(
-               MemoryStateBackend stateBackend,
-               final TypeSerializer<K> keySerializer,
-               ClassLoader classLoader) throws Exception {
-
-               // validity checks
-               if (!this.keySerializer.equals(keySerializer)) {
-                       throw new IllegalArgumentException(
-                               "Cannot restore the state from the snapshot 
with the given serializers. " +
-                                       "State (K/V) was serialized with " +
-                                       "(" + this.keySerializer + ") " +
-                                       "now is (" + keySerializer + ")");
-               }
-
-               if (closed) {
-                       throw new IOException("snapshot has been closed");
-               }
-
-               // restore state
-               DataInputDeserializer inView = new DataInputDeserializer(data, 
0, data.length);
-
-               final int numKeys = inView.readInt();
-               HashMap<N, Map<K, SV>> stateMap = new HashMap<>(numKeys);
-
-               for (int i = 0; i < numKeys && !closed; i++) {
-                       N namespace = namespaceSerializer.deserialize(inView);
-                       final int numValues = inView.readInt();
-                       Map<K, SV> namespaceMap = new HashMap<>(numValues);
-                       stateMap.put(namespace, namespaceMap);
-                       for (int j = 0; j < numValues; j++) {
-                               K key = keySerializer.deserialize(inView);
-                               SV value = stateSerializer.deserialize(inView);
-                               namespaceMap.put(key, value);
-                       }
-               }
-
-               if (closed) {
-                       throw new IOException("snapshot has been closed");
-               }
-
-               return createMemState(stateMap);
-       }
-
-       /**
-        * Discarding the heap state is a no-op.
-        */
-       @Override
-       public void discardState() {}
-
-       @Override
-       public long getStateSize() {
-               return data.length;
-       }
-
-       @Override
-       public void close() {
-               closed = true;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
index a42bec2..b9ff255 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
@@ -51,7 +51,7 @@ public class ByteStreamStateHandle extends 
AbstractCloseableHandle implements St
        }
 
        @Override
-       public FSDataInputStream openInputStream() throws Exception {
+       public FSDataInputStream openInputStream() throws IOException {
                ensureNotClosed();
 
                FSDataInputStream inputStream = new FSDataInputStream() {

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
new file mode 100644
index 0000000..4801d85
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.memory;
+
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.io.IOException;
+
+/**
+ * {@link CheckpointStreamFactory} that produces streams that write to 
in-memory byte arrays.
+ */
+public class MemCheckpointStreamFactory implements CheckpointStreamFactory {
+
+       /** The maximal size that the snapshotted memory state may have */
+       private final int maxStateSize;
+
+       /**
+        * Creates a new in-memory stream factory that accepts states whose 
serialized forms are
+        * up to the given number of bytes.
+        *
+        * @param maxStateSize The maximal size of the serialized state
+        */
+       public MemCheckpointStreamFactory(int maxStateSize) {
+               this.maxStateSize = maxStateSize;
+       }
+
+       @Override
+       public void close() throws Exception {}
+
+       @Override
+       public CheckpointStateOutputStream createCheckpointStateOutputStream(
+                       long checkpointID, long timestamp) throws Exception
+       {
+               return new MemoryCheckpointOutputStream(maxStateSize);
+       }
+
+       @Override
+       public String toString() {
+               return "In-Memory Stream Factory";
+       }
+
+       static void checkSize(int size, int maxSize) throws IOException {
+               if (size > maxSize) {
+                       throw new IOException(
+                                       "Size of the state is larger than the 
maximum permitted memory-backed state. Size="
+                                                       + size + " , maxSize=" 
+ maxSize
+                                                       + " . Consider using a 
different state backend, like the File System State backend.");
+               }
+       }
+
+
+
+       /**
+        * A {@code CheckpointStateOutputStream} that writes into a byte array.
+        */
+       public static final class MemoryCheckpointOutputStream extends 
CheckpointStateOutputStream {
+
+               private final ByteArrayOutputStreamWithPos os = new 
ByteArrayOutputStreamWithPos();
+
+               private final int maxSize;
+
+               private boolean closed;
+
+               boolean isEmpty = true;
+
+               public MemoryCheckpointOutputStream(int maxSize) {
+                       this.maxSize = maxSize;
+               }
+
+               @Override
+               public void write(int b) {
+                       os.write(b);
+                       isEmpty = false;
+               }
+
+               @Override
+               public void write(byte[] b, int off, int len) {
+                       os.write(b, off, len);
+                       isEmpty = false;
+               }
+
+               @Override
+               public void flush() throws IOException {
+                       os.flush();
+               }
+
+               @Override
+               public void sync() throws IOException { }
+
+               // 
--------------------------------------------------------------------
+
+               @Override
+               public void close() {
+                       closed = true;
+                       os.reset();
+               }
+
+               @Override
+               public StreamStateHandle closeAndGetHandle() throws IOException 
{
+                       if (isEmpty) {
+                               return null;
+                       }
+                       return new ByteStreamStateHandle(closeAndGetBytes());
+               }
+
+               @Override
+               public long getPos() throws IOException {
+                       return os.getPosition();
+               }
+
+               /**
+                * Closes the stream and returns the byte array containing the 
stream's data.
+                * @return The byte array containing the stream's data.
+                * @throws IOException Thrown if the size of the data exceeds 
the maximal
+                */
+               public byte[] closeAndGetBytes() throws IOException {
+                       if (!closed) {
+                               checkSize(os.size(), maxSize);
+                               byte[] bytes = os.toByteArray();
+                               close();
+                               return bytes;
+                       }
+                       else {
+                               throw new IllegalStateException("stream has 
already been closed");
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemFoldingState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemFoldingState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemFoldingState.java
deleted file mode 100644
index a4dec3b..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemFoldingState.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.memory;
-
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.state.FoldingState;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
-import org.apache.flink.runtime.state.KvState;
-import org.apache.flink.runtime.state.KvStateSnapshot;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Heap-backed partitioned {@link FoldingState} that is
- * snapshotted into a serialized memory copy.
- *
- * @param <K> The type of the key.
- * @param <N> The type of the namespace.
- * @param <T> The type of the values that can be folded into the state.
- * @param <ACC> The type of the value in the folding state.
- */
-public class MemFoldingState<K, N, T, ACC>
-       extends AbstractMemState<K, N, ACC, FoldingState<T, ACC>, 
FoldingStateDescriptor<T, ACC>>
-       implements FoldingState<T, ACC> {
-
-       private final FoldFunction<T, ACC> foldFunction;
-
-       public MemFoldingState(TypeSerializer<K> keySerializer,
-               TypeSerializer<N> namespaceSerializer,
-               FoldingStateDescriptor<T, ACC> stateDesc) {
-               super(keySerializer, namespaceSerializer, 
stateDesc.getSerializer(), stateDesc);
-               this.foldFunction = stateDesc.getFoldFunction();
-       }
-
-       public MemFoldingState(TypeSerializer<K> keySerializer,
-               TypeSerializer<N> namespaceSerializer,
-               FoldingStateDescriptor<T, ACC> stateDesc,
-               HashMap<N, Map<K, ACC>> state) {
-               super(keySerializer, namespaceSerializer, 
stateDesc.getSerializer(), stateDesc, state);
-               this.foldFunction = stateDesc.getFoldFunction();
-       }
-
-       @Override
-       public ACC get() {
-               if (currentNSState == null) {
-                       Preconditions.checkState(currentNamespace != null, "No 
namespace set");
-                       currentNSState = state.get(currentNamespace);
-               }
-               if (currentNSState != null) {
-                       Preconditions.checkState(currentKey != null, "No key 
set");
-                       return currentNSState.get(currentKey);
-               } else {
-                       return null;
-               }
-       }
-
-       @Override
-       public void add(T value) throws IOException {
-               Preconditions.checkState(currentKey != null, "No key set");
-
-               if (currentNSState == null) {
-                       Preconditions.checkState(currentNamespace != null, "No 
namespace set");
-                       currentNSState = createNewNamespaceMap();
-                       state.put(currentNamespace, currentNSState);
-               }
-
-               ACC currentValue = currentNSState.get(currentKey);
-               try {
-                       if (currentValue == null) {
-                               currentNSState.put(currentKey, 
foldFunction.fold(stateDesc.getDefaultValue(), value));
-                       } else {
-                                       currentNSState.put(currentKey, 
foldFunction.fold(currentValue, value));
-
-                       }
-               } catch (Exception e) {
-                       throw new RuntimeException("Could not add value to 
folding state.", e);
-               }
-       }
-
-       @Override
-       public KvStateSnapshot<K, N, FoldingState<T, ACC>, 
FoldingStateDescriptor<T, ACC>, MemoryStateBackend> createHeapSnapshot(byte[] 
bytes) {
-               return new Snapshot<>(getKeySerializer(), 
getNamespaceSerializer(), stateSerializer, stateDesc, bytes);
-       }
-
-       @Override
-       public byte[] getSerializedValue(K key, N namespace) throws Exception {
-               Preconditions.checkNotNull(key, "Key");
-               Preconditions.checkNotNull(namespace, "Namespace");
-
-               Map<K, ACC> stateByKey = state.get(namespace);
-
-               if (stateByKey != null) {
-                       return 
KvStateRequestSerializer.serializeValue(stateByKey.get(key), 
stateDesc.getSerializer());
-               } else {
-                       return null;
-               }
-       }
-
-       public static class Snapshot<K, N, T, ACC> extends 
AbstractMemStateSnapshot<K, N, ACC, FoldingState<T, ACC>, 
FoldingStateDescriptor<T, ACC>> {
-               private static final long serialVersionUID = 1L;
-
-               public Snapshot(TypeSerializer<K> keySerializer,
-                       TypeSerializer<N> namespaceSerializer,
-                       TypeSerializer<ACC> stateSerializer,
-                       FoldingStateDescriptor<T, ACC> stateDescs, byte[] data) 
{
-                       super(keySerializer, namespaceSerializer, 
stateSerializer, stateDescs, data);
-               }
-
-               @Override
-               public KvState<K, N, FoldingState<T, ACC>, 
FoldingStateDescriptor<T, ACC>, MemoryStateBackend> createMemState(HashMap<N, 
Map<K, ACC>> stateMap) {
-                       return new MemFoldingState<>(keySerializer, 
namespaceSerializer, stateDesc, stateMap);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemListState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemListState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemListState.java
deleted file mode 100644
index 20b6eb5..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemListState.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.memory;
-
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
-import org.apache.flink.runtime.state.ArrayListSerializer;
-import org.apache.flink.runtime.state.KvState;
-import org.apache.flink.runtime.state.KvStateSnapshot;
-import org.apache.flink.util.Preconditions;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Heap-backed partitioned {@link org.apache.flink.api.common.state.ListState} 
that is snapshotted
- * into a serialized memory copy.
- *
- * @param <K> The type of the key.
- * @param <N> The type of the namespace.
- * @param <V> The type of the values in the list state.
- */
-public class MemListState<K, N, V>
-       extends AbstractMemState<K, N, ArrayList<V>, ListState<V>, 
ListStateDescriptor<V>>
-       implements ListState<V> {
-
-       public MemListState(TypeSerializer<K> keySerializer, TypeSerializer<N> 
namespaceSerializer, ListStateDescriptor<V> stateDesc) {
-               super(keySerializer, namespaceSerializer, new 
ArrayListSerializer<>(stateDesc.getSerializer()), stateDesc);
-       }
-
-       public MemListState(TypeSerializer<K> keySerializer, TypeSerializer<N> 
namespaceSerializer, ListStateDescriptor<V> stateDesc, HashMap<N, Map<K, 
ArrayList<V>>> state) {
-               super(keySerializer, namespaceSerializer, new 
ArrayListSerializer<>(stateDesc.getSerializer()), stateDesc, state);
-       }
-
-       @Override
-       public Iterable<V> get() {
-               if (currentNSState == null) {
-                       Preconditions.checkState(currentNamespace != null, "No 
namespace set");
-                       currentNSState = state.get(currentNamespace);
-               }
-               if (currentNSState != null) {
-                       Preconditions.checkState(currentKey != null, "No key 
set");
-                       return currentNSState.get(currentKey);
-               } else {
-                       return null;
-               }
-       }
-
-       @Override
-       public void add(V value) {
-               Preconditions.checkState(currentKey != null, "No key set");
-
-               if (currentNSState == null) {
-                       Preconditions.checkState(currentNamespace != null, "No 
namespace set");
-                       currentNSState = createNewNamespaceMap();
-                       state.put(currentNamespace, currentNSState);
-               }
-
-               ArrayList<V> list = currentNSState.get(currentKey);
-               if (list == null) {
-                       list = new ArrayList<>();
-                       currentNSState.put(currentKey, list);
-               }
-               list.add(value);
-       }
-
-       @Override
-       public KvStateSnapshot<K, N, ListState<V>, ListStateDescriptor<V>, 
MemoryStateBackend> createHeapSnapshot(byte[] bytes) {
-               return new Snapshot<>(getKeySerializer(), 
getNamespaceSerializer(), stateSerializer, stateDesc, bytes);
-       }
-
-       @Override
-       public byte[] getSerializedValue(K key, N namespace) throws Exception {
-               Preconditions.checkNotNull(key, "Key");
-               Preconditions.checkNotNull(namespace, "Namespace");
-
-               Map<K, ArrayList<V>> stateByKey = state.get(namespace);
-               if (stateByKey != null) {
-                       return 
KvStateRequestSerializer.serializeList(stateByKey.get(key), 
stateDesc.getSerializer());
-               } else {
-                       return null;
-               }
-       }
-
-       public static class Snapshot<K, N, V> extends 
AbstractMemStateSnapshot<K, N, ArrayList<V>, ListState<V>, 
ListStateDescriptor<V>> {
-               private static final long serialVersionUID = 1L;
-
-               public Snapshot(TypeSerializer<K> keySerializer,
-                       TypeSerializer<N> namespaceSerializer,
-                       TypeSerializer<ArrayList<V>> stateSerializer,
-                       ListStateDescriptor<V> stateDescs, byte[] data) {
-                       super(keySerializer, namespaceSerializer, 
stateSerializer, stateDescs, data);
-               }
-
-               @Override
-               public KvState<K, N, ListState<V>, ListStateDescriptor<V>, 
MemoryStateBackend> createMemState(HashMap<N, Map<K, ArrayList<V>>> stateMap) {
-                       return new MemListState<>(keySerializer, 
namespaceSerializer, stateDesc, stateMap);
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemReducingState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemReducingState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemReducingState.java
deleted file mode 100644
index 9a4c676..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemReducingState.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.memory;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.state.ReducingState;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
-import org.apache.flink.runtime.state.KvState;
-import org.apache.flink.runtime.state.KvStateSnapshot;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Heap-backed partitioned {@link 
org.apache.flink.api.common.state.ReducingState} that is
- * snapshotted into a serialized memory copy.
- *
- * @param <K> The type of the key.
- * @param <N> The type of the namespace.
- * @param <V> The type of the values in the list state.
- */
-public class MemReducingState<K, N, V>
-       extends AbstractMemState<K, N, V, ReducingState<V>, 
ReducingStateDescriptor<V>>
-       implements ReducingState<V> {
-
-       private final ReduceFunction<V> reduceFunction;
-
-       public MemReducingState(TypeSerializer<K> keySerializer,
-               TypeSerializer<N> namespaceSerializer,
-               ReducingStateDescriptor<V> stateDesc) {
-               super(keySerializer, namespaceSerializer, 
stateDesc.getSerializer(), stateDesc);
-               this.reduceFunction = stateDesc.getReduceFunction();
-       }
-
-       public MemReducingState(TypeSerializer<K> keySerializer,
-               TypeSerializer<N> namespaceSerializer,
-               ReducingStateDescriptor<V> stateDesc,
-               HashMap<N, Map<K, V>> state) {
-               super(keySerializer, namespaceSerializer, 
stateDesc.getSerializer(), stateDesc, state);
-               this.reduceFunction = stateDesc.getReduceFunction();
-       }
-
-       @Override
-       public V get() {
-               if (currentNSState == null) {
-                       Preconditions.checkState(currentNamespace != null, "No 
namespace set");
-                       currentNSState = state.get(currentNamespace);
-               }
-               if (currentNSState != null) {
-                       Preconditions.checkState(currentKey != null, "No key 
set");
-                       return currentNSState.get(currentKey);
-               }
-               return null;
-       }
-
-       @Override
-       public void add(V value) throws IOException {
-               Preconditions.checkState(currentKey != null, "No key set");
-
-               if (currentNSState == null) {
-                       Preconditions.checkState(currentNamespace != null, "No 
namespace set");
-                       currentNSState = createNewNamespaceMap();
-                       state.put(currentNamespace, currentNSState);
-               }
-//             currentKeyState.merge(currentNamespace, value, new 
BiFunction<V, V, V>() {
-//                     @Override
-//                     public V apply(V v, V v2) {
-//                             try {
-//                                     return reduceFunction.reduce(v, v2);
-//                             } catch (Exception e) {
-//                                     return null;
-//                             }
-//                     }
-//             });
-               V currentValue = currentNSState.get(currentKey);
-               if (currentValue == null) {
-                       currentNSState.put(currentKey, value);
-               } else {
-                       try {
-                               currentNSState.put(currentKey, 
reduceFunction.reduce(currentValue, value));
-                       } catch (Exception e) {
-                               throw new RuntimeException("Could not add value 
to reducing state.", e);
-                       }
-               }
-       }
-
-       @Override
-       public KvStateSnapshot<K, N, ReducingState<V>, 
ReducingStateDescriptor<V>, MemoryStateBackend> createHeapSnapshot(byte[] 
bytes) {
-               return new Snapshot<>(getKeySerializer(), 
getNamespaceSerializer(), stateSerializer, stateDesc, bytes);
-       }
-
-       @Override
-       public byte[] getSerializedValue(K key, N namespace) throws Exception {
-               Preconditions.checkNotNull(key, "Key");
-               Preconditions.checkNotNull(namespace, "Namespace");
-
-               Map<K, V> stateByKey = state.get(namespace);
-               if (stateByKey != null) {
-                       return 
KvStateRequestSerializer.serializeValue(stateByKey.get(key), 
stateDesc.getSerializer());
-               } else {
-                       return null;
-               }
-       }
-
-       public static class Snapshot<K, N, V> extends 
AbstractMemStateSnapshot<K, N, V, ReducingState<V>, ReducingStateDescriptor<V>> 
{
-               private static final long serialVersionUID = 1L;
-
-               public Snapshot(TypeSerializer<K> keySerializer,
-                       TypeSerializer<N> namespaceSerializer,
-                       TypeSerializer<V> stateSerializer,
-                       ReducingStateDescriptor<V> stateDescs, byte[] data) {
-                       super(keySerializer, namespaceSerializer, 
stateSerializer, stateDescs, data);
-               }
-
-               @Override
-               public KvState<K, N, ReducingState<V>, 
ReducingStateDescriptor<V>, MemoryStateBackend> createMemState(HashMap<N, 
Map<K, V>> stateMap) {
-                       return new MemReducingState<>(keySerializer, 
namespaceSerializer, stateDesc, stateMap);
-               }
-       }}

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemValueState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemValueState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemValueState.java
deleted file mode 100644
index c0e3779..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemValueState.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.memory;
-
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
-import org.apache.flink.runtime.state.KvState;
-import org.apache.flink.runtime.state.KvStateSnapshot;
-import org.apache.flink.util.Preconditions;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Heap-backed key/value state that is snapshotted into a serialized memory 
copy.
- *
- * @param <K> The type of the key.
- * @param <N> The type of the namespace.
- * @param <V> The type of the value.
- */
-public class MemValueState<K, N, V>
-       extends AbstractMemState<K, N, V, ValueState<V>, 
ValueStateDescriptor<V>>
-       implements ValueState<V> {
-       
-       public MemValueState(TypeSerializer<K> keySerializer,
-               TypeSerializer<N> namespaceSerializer,
-               ValueStateDescriptor<V> stateDesc) {
-               super(keySerializer, namespaceSerializer, 
stateDesc.getSerializer(), stateDesc);
-       }
-
-       public MemValueState(TypeSerializer<K> keySerializer,
-               TypeSerializer<N> namespaceSerializer,
-               ValueStateDescriptor<V> stateDesc,
-               HashMap<N, Map<K, V>> state) {
-               super(keySerializer, namespaceSerializer, 
stateDesc.getSerializer(), stateDesc, state);
-       }
-
-       @Override
-       public V value() {
-               if (currentNSState == null) {
-                       Preconditions.checkState(currentNamespace != null, "No 
namespace set");
-                       currentNSState = state.get(currentNamespace);
-               }
-               if (currentNSState != null) {
-                       Preconditions.checkState(currentKey != null, "No key 
set");
-                       V value = currentNSState.get(currentKey);
-                       return value != null ? value : 
stateDesc.getDefaultValue();
-               }
-               return stateDesc.getDefaultValue();
-       }
-
-       @Override
-       public void update(V value) {
-               Preconditions.checkState(currentKey != null, "No key set");
-
-               if (value == null) {
-                       clear();
-                       return;
-               }
-
-               if (currentNSState == null) {
-                       Preconditions.checkState(currentNamespace != null, "No 
namespace set");
-                       currentNSState = createNewNamespaceMap();
-                       state.put(currentNamespace, currentNSState);
-               }
-
-               currentNSState.put(currentKey, value);
-       }
-
-       @Override
-       public KvStateSnapshot<K, N, ValueState<V>, ValueStateDescriptor<V>, 
MemoryStateBackend> createHeapSnapshot(byte[] bytes) {
-               return new Snapshot<>(getKeySerializer(), 
getNamespaceSerializer(), stateSerializer, stateDesc, bytes);
-       }
-
-       @Override
-       public byte[] getSerializedValue(K key, N namespace) throws Exception {
-               Preconditions.checkNotNull(key, "Key");
-               Preconditions.checkNotNull(namespace, "Namespace");
-
-               Map<K, V> stateByKey = state.get(namespace);
-               V value = stateByKey != null ? stateByKey.get(key) : 
stateDesc.getDefaultValue();
-               if (value != null) {
-                       return KvStateRequestSerializer.serializeValue(value, 
stateDesc.getSerializer());
-               } else {
-                       return 
KvStateRequestSerializer.serializeValue(stateDesc.getDefaultValue(), 
stateDesc.getSerializer());
-               }
-       }
-
-       public static class Snapshot<K, N, V> extends 
AbstractMemStateSnapshot<K, N, V, ValueState<V>, ValueStateDescriptor<V>> {
-               private static final long serialVersionUID = 1L;
-
-               public Snapshot(TypeSerializer<K> keySerializer,
-                       TypeSerializer<N> namespaceSerializer,
-                       TypeSerializer<V> stateSerializer,
-                       ValueStateDescriptor<V> stateDescs, byte[] data) {
-                       super(keySerializer, namespaceSerializer, 
stateSerializer, stateDescs, data);
-               }
-
-               @Override
-               public KvState<K, N, ValueState<V>, ValueStateDescriptor<V>, 
MemoryStateBackend> createMemState(HashMap<N, Map<K, V>> stateMap) {
-                       return new MemValueState<>(keySerializer, 
namespaceSerializer, stateDesc, stateMap);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
index af84394..654c367 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -18,21 +18,20 @@
 
 package org.apache.flink.runtime.state.memory;
 
-import org.apache.flink.api.common.state.FoldingState;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.ReducingState;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.KeyGroupAssigner;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 
-
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.util.List;
 
 /**
  * A {@link AbstractStateBackend} that stores all its data and checkpoints in 
memory and has no
@@ -67,142 +66,47 @@ public class MemoryStateBackend extends 
AbstractStateBackend {
                this.maxStateSize = maxStateSize;
        }
 
-       // 
------------------------------------------------------------------------
-       //  initialization and cleanup
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public void disposeAllStateForCurrentJob() {
-               // nothing to do here, GC will do it
-       }
-
-       @Override
-       public void close() throws Exception {}
-
-       // 
------------------------------------------------------------------------
-       //  State backend operations
-       // 
------------------------------------------------------------------------
-
        @Override
-       public <N, V> ValueState<V> createValueState(TypeSerializer<N> 
namespaceSerializer, ValueStateDescriptor<V> stateDesc) throws Exception {
-               return new MemValueState<>(keySerializer, namespaceSerializer, 
stateDesc);
-       }
-
-       @Override
-       public <N, T> ListState<T> createListState(TypeSerializer<N> 
namespaceSerializer, ListStateDescriptor<T> stateDesc) throws Exception {
-               return new MemListState<>(keySerializer, namespaceSerializer, 
stateDesc);
-       }
-
-       @Override
-       public <N, T> ReducingState<T> createReducingState(TypeSerializer<N> 
namespaceSerializer, ReducingStateDescriptor<T> stateDesc) throws Exception {
-               return new MemReducingState<>(keySerializer, 
namespaceSerializer, stateDesc);
+       public String toString() {
+               return "MemoryStateBackend (data in heap memory / checkpoints 
to JobManager)";
        }
 
        @Override
-       public <N, T, ACC> FoldingState<T, ACC> 
createFoldingState(TypeSerializer<N> namespaceSerializer, 
FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
-               return new MemFoldingState<>(keySerializer, 
namespaceSerializer, stateDesc);
+       public CheckpointStreamFactory createStreamFactory(JobID jobId, String 
operatorIdentifier) throws IOException {
+               return new MemCheckpointStreamFactory(maxStateSize);
        }
 
        @Override
-       public CheckpointStateOutputStream createCheckpointStateOutputStream(
-                       long checkpointID, long timestamp) throws Exception
-       {
-               return new MemoryCheckpointOutputStream(maxStateSize);
+       public <K> KeyedStateBackend<K> createKeyedStateBackend(
+                       Environment env, JobID jobID,
+                       String operatorIdentifier, TypeSerializer<K> 
keySerializer,
+                       KeyGroupAssigner<K> keyGroupAssigner,
+                       KeyGroupRange keyGroupRange,
+                       TaskKvStateRegistry kvStateRegistry) throws IOException 
{
+
+               return new HeapKeyedStateBackend<>(
+                               kvStateRegistry,
+                               keySerializer,
+                               keyGroupAssigner,
+                               keyGroupRange);
        }
 
-       // 
------------------------------------------------------------------------
-       //  Utilities
-       // 
------------------------------------------------------------------------
-
        @Override
-       public String toString() {
-               return "MemoryStateBackend (data in heap memory / checkpoints 
to JobManager)";
+       public <K> KeyedStateBackend<K> restoreKeyedStateBackend(
+                       Environment env, JobID jobID,
+                       String operatorIdentifier,
+                       TypeSerializer<K> keySerializer,
+                       KeyGroupAssigner<K> keyGroupAssigner,
+                       KeyGroupRange keyGroupRange,
+                       List<KeyGroupsStateHandle> restoredState,
+                       TaskKvStateRegistry kvStateRegistry) throws Exception {
+
+               return new HeapKeyedStateBackend<>(
+                               kvStateRegistry,
+                               keySerializer,
+                               keyGroupAssigner,
+                               keyGroupRange,
+                               restoredState);
        }
 
-       static void checkSize(int size, int maxSize) throws IOException {
-               if (size > maxSize) {
-                       throw new IOException(
-                                       "Size of the state is larger than the 
maximum permitted memory-backed state. Size="
-                                                       + size + " , maxSize=" 
+ maxSize
-                                                       + " . Consider using a 
different state backend, like the File System State backend.");
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-
-       /**
-        * A CheckpointStateOutputStream that writes into a byte array.
-        */
-       public static final class MemoryCheckpointOutputStream extends 
CheckpointStateOutputStream {
-
-               private final ByteArrayOutputStream os = new 
ByteArrayOutputStream();
-
-               private final int maxSize;
-
-               private boolean closed;
-
-               public MemoryCheckpointOutputStream(int maxSize) {
-                       this.maxSize = maxSize;
-               }
-
-               @Override
-               public void write(int b) {
-                       os.write(b);
-               }
-
-               @Override
-               public void write(byte[] b, int off, int len) {
-                       os.write(b, off, len);
-               }
-
-               @Override
-               public void flush() throws IOException {
-                       os.flush();
-               }
-
-               @Override
-               public void sync() throws IOException { }
-
-               // 
--------------------------------------------------------------------
-
-               @Override
-               public void close() {
-                       closed = true;
-                       os.reset();
-               }
-
-               @Override
-               public StreamStateHandle closeAndGetHandle() throws IOException 
{
-                       return new ByteStreamStateHandle(closeAndGetBytes());
-               }
-
-               /**
-                * Closes the stream and returns the byte array containing the 
stream's data.
-                * @return The byte array containing the stream's data.
-                * @throws IOException Thrown if the size of the data exceeds 
the maximal
-                */
-               public byte[] closeAndGetBytes() throws IOException {
-                       if (!closed) {
-                               checkSize(os.size(), maxSize);
-                               byte[] bytes = os.toByteArray();
-                               close();
-                               return bytes;
-                       }
-                       else {
-                               throw new IllegalStateException("stream has 
already been closed");
-                       }
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Static default instance
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Gets the default instance of this state backend, using the default 
maximal state size.
-        * @return The default instance of this state backend.
-        */
-       public static MemoryStateBackend create() {
-               return new MemoryStateBackend();
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
index d703bd6..766531a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
@@ -87,14 +87,14 @@ public class SavepointLoaderTest {
                loaded.discardState();
                verify(state, times(0)).discardState();
 
-               // 2) Load and validate: parallelism mismatch
-               when(vertex.getParallelism()).thenReturn(222);
+               // 2) Load and validate: max parallelism mismatch
+               when(vertex.getMaxParallelism()).thenReturn(222);
 
                try {
                        SavepointLoader.loadAndValidateSavepoint(jobId, tasks, 
store, path);
                        fail("Did not throw expected Exception");
                } catch (IllegalStateException expected) {
-                       assertTrue(expected.getMessage().contains("Parallelism 
mismatch"));
+                       assertTrue(expected.getMessage().contains("Max 
parallelism mismatch"));
                }
 
                // 3) Load and validate: missing vertex (this should be relaxed)

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
index 56da9c8..39ea176 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
@@ -48,6 +48,7 @@ public class TaskDeploymentDescriptorTest {
                        final ExecutionAttemptID execId = new 
ExecutionAttemptID();
                        final String jobName = "job name";
                        final String taskName = "task name";
+                       final int numberOfKeyGroups = 1;
                        final int indexInSubtaskGroup = 0;
                        final int currentNumberOfSubtasks = 1;
                        final int attemptNumber = 0;
@@ -61,7 +62,7 @@ public class TaskDeploymentDescriptorTest {
                        final SerializedValue<ExecutionConfig> executionConfig 
= new SerializedValue<>(new ExecutionConfig());
 
                        final TaskDeploymentDescriptor orig = new 
TaskDeploymentDescriptor(jobID, jobName, vertexID, execId,
-                               executionConfig, taskName, indexInSubtaskGroup, 
currentNumberOfSubtasks, attemptNumber,
+                               executionConfig, taskName, numberOfKeyGroups, 
indexInSubtaskGroup, currentNumberOfSubtasks, attemptNumber,
                                jobConfiguration, taskConfiguration, 
invokableClass.getName(), producedResults, inputGates,
                                requiredJars, requiredClasspaths, 47);
        
@@ -76,6 +77,7 @@ public class TaskDeploymentDescriptorTest {
                        assertEquals(orig.getJobID(), copy.getJobID());
                        assertEquals(orig.getVertexID(), copy.getVertexID());
                        assertEquals(orig.getTaskName(), copy.getTaskName());
+                       assertEquals(orig.getNumberOfKeyGroups(), 
copy.getNumberOfKeyGroups());
                        assertEquals(orig.getIndexInSubtaskGroup(), 
copy.getIndexInSubtaskGroup());
                        assertEquals(orig.getNumberOfSubtasks(), 
copy.getNumberOfSubtasks());
                        assertEquals(orig.getAttemptNumber(), 
copy.getAttemptNumber());

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
index c6eb249..6a6ac64 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
@@ -120,7 +120,7 @@ public class CheckpointMessagesTest {
                public void close() throws IOException {}
 
                @Override
-               public FSDataInputStream openInputStream() throws Exception {
+               public FSDataInputStream openInputStream() throws IOException {
                        return null;
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
index b80ff6a..b2c5dc7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
@@ -77,7 +77,7 @@ public class TaskManagerGroupTest {
                        execution11, 
                        new SerializedValue<>(new ExecutionConfig()), 
                        "test", 
-                       17, 18, 0, 
+                       18, 17, 18, 0,
                        new Configuration(), new Configuration(), 
                        "", 
                        new ArrayList<ResultPartitionDeploymentDescriptor>(), 
@@ -92,7 +92,7 @@ public class TaskManagerGroupTest {
                        execution12,
                        new SerializedValue<>(new ExecutionConfig()),
                        "test",
-                       13, 18, 1,
+                       18, 13, 18, 1,
                        new Configuration(), new Configuration(),
                        "",
                        new ArrayList<ResultPartitionDeploymentDescriptor>(),
@@ -107,7 +107,7 @@ public class TaskManagerGroupTest {
                        execution21,
                        new SerializedValue<>(new ExecutionConfig()),
                        "test",
-                       7, 18, 2,
+                       18, 7, 18, 2,
                        new Configuration(), new Configuration(),
                        "",
                        new ArrayList<ResultPartitionDeploymentDescriptor>(),
@@ -122,7 +122,7 @@ public class TaskManagerGroupTest {
                        execution13,
                        new SerializedValue<>(new ExecutionConfig()),
                        "test",
-                       0, 18, 0,
+                       18, 0, 18, 0,
                        new Configuration(), new Configuration(),
                        "",
                        new ArrayList<ResultPartitionDeploymentDescriptor>(),
@@ -193,7 +193,7 @@ public class TaskManagerGroupTest {
                        execution11,
                        new SerializedValue<>(new ExecutionConfig()),
                        "test",
-                       17, 18, 0,
+                       18, 17, 18, 0,
                        new Configuration(), new Configuration(),
                        "",
                        new ArrayList<ResultPartitionDeploymentDescriptor>(),
@@ -208,7 +208,7 @@ public class TaskManagerGroupTest {
                        execution12,
                        new SerializedValue<>(new ExecutionConfig()),
                        "test",
-                       13, 18, 1,
+                       18, 13, 18, 1,
                        new Configuration(), new Configuration(),
                        "",
                        new ArrayList<ResultPartitionDeploymentDescriptor>(),
@@ -223,7 +223,7 @@ public class TaskManagerGroupTest {
                        execution21,
                        new SerializedValue<>(new ExecutionConfig()),
                        "test",
-                       7, 18, 1,
+                       18, 7, 18, 1,
                        new Configuration(), new Configuration(),
                        "",
                        new ArrayList<ResultPartitionDeploymentDescriptor>(),

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
index 19317f9..4654507 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
@@ -53,13 +53,14 @@ public class DummyEnvironment implements Environment {
        private final ExecutionAttemptID executionId = new ExecutionAttemptID();
        private final ExecutionConfig executionConfig = new ExecutionConfig();
        private final TaskInfo taskInfo;
-       private final KvStateRegistry kvStateRegistry = new KvStateRegistry();
-       private final TaskKvStateRegistry taskKvStateRegistry;
+       private KvStateRegistry kvStateRegistry = new KvStateRegistry();
 
        public DummyEnvironment(String taskName, int numSubTasks, int 
subTaskIndex) {
-               this.taskInfo = new TaskInfo(taskName, subTaskIndex, 
numSubTasks, 0);
+               this.taskInfo = new TaskInfo(taskName, numSubTasks, 
subTaskIndex, numSubTasks, 0);
+       }
 
-               this.taskKvStateRegistry = 
kvStateRegistry.createTaskRegistry(jobId, jobVertexId);
+       public void setKvStateRegistry(KvStateRegistry kvStateRegistry) {
+               this.kvStateRegistry = kvStateRegistry;
        }
 
        public KvStateRegistry getKvStateRegistry() {
@@ -148,7 +149,7 @@ public class DummyEnvironment implements Environment {
 
        @Override
        public TaskKvStateRegistry getTaskKvStateRegistry() {
-               return taskKvStateRegistry;
+               return kvStateRegistry.createTaskRegistry(jobId, jobVertexId);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 2c76399..e7bf6e1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -99,11 +99,24 @@ public class MockEnvironment implements Environment {
        private final int bufferSize;
 
        public MockEnvironment(String taskName, long memorySize, 
MockInputSplitProvider inputSplitProvider, int bufferSize) {
-               this(taskName, memorySize, inputSplitProvider, bufferSize, new 
Configuration());
+               this(taskName, memorySize, inputSplitProvider, bufferSize, new 
Configuration(), new ExecutionConfig());
        }
 
-       public MockEnvironment(String taskName, long memorySize, 
MockInputSplitProvider inputSplitProvider, int bufferSize, Configuration 
taskConfiguration) {
-               this.taskInfo = new TaskInfo(taskName, 0, 1, 0);
+       public MockEnvironment(String taskName, long memorySize, 
MockInputSplitProvider inputSplitProvider, int bufferSize, Configuration 
taskConfiguration, ExecutionConfig executionConfig) {
+               this(taskName, memorySize, inputSplitProvider, bufferSize, 
taskConfiguration, executionConfig, 1, 1, 0);
+       }
+
+       public MockEnvironment(
+                       String taskName,
+                       long memorySize,
+                       MockInputSplitProvider inputSplitProvider,
+                       int bufferSize,
+                       Configuration taskConfiguration,
+                       ExecutionConfig executionConfig,
+                       int maxParallelism,
+                       int parallelism,
+                       int subtaskIndex) {
+               this.taskInfo = new TaskInfo(taskName, maxParallelism, 
subtaskIndex, parallelism, 0);
                this.jobConfiguration = new Configuration();
                this.taskConfiguration = taskConfiguration;
                this.inputs = new LinkedList<InputGate>();
@@ -111,7 +124,7 @@ public class MockEnvironment implements Environment {
 
                this.memManager = new MemoryManager(memorySize, 1);
                this.ioManager = new IOManagerAsync();
-               this.executionConfig = new ExecutionConfig();
+               this.executionConfig = executionConfig;
                this.inputSplitProvider = inputSplitProvider;
                this.bufferSize = bufferSize;
 
@@ -121,6 +134,7 @@ public class MockEnvironment implements Environment {
                this.kvStateRegistry = registry.createTaskRegistry(jobID, 
getJobVertexId());
        }
 
+
        public IteratorWrappingTestSingleInputGate<Record> 
addInput(MutableObjectIterator<Record> inputIterator) {
                try {
                        final IteratorWrappingTestSingleInputGate<Record> 
reader = new IteratorWrappingTestSingleInputGate<Record>(bufferSize, 
Record.class, inputIterator);

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
index 36f2b45..3380907 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
@@ -26,14 +26,20 @@ import 
org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats;
 import org.apache.flink.runtime.query.netty.KvStateClient;
 import org.apache.flink.runtime.query.netty.KvStateServer;
 import org.apache.flink.runtime.query.netty.UnknownKvStateID;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
+import org.apache.flink.runtime.state.HashKeyGroupAssigner;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.runtime.state.memory.MemValueState;
+import org.apache.flink.runtime.state.heap.HeapValueState;
+import org.apache.flink.runtime.state.heap.StateTable;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.util.MathUtils;
 import org.junit.AfterClass;
 import org.junit.Test;
@@ -237,10 +243,22 @@ public class QueryableStateClientTest {
                KvStateClient networkClient = null;
                AtomicKvStateRequestStats networkClientStats = new 
AtomicKvStateRequestStats();
 
+               MemoryStateBackend backend = new MemoryStateBackend();
+               DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
+
+               KeyedStateBackend<Integer> keyedStateBackend = 
backend.createKeyedStateBackend(dummyEnv,
+                               new JobID(),
+                               "test_op",
+                               IntSerializer.INSTANCE,
+                               new HashKeyGroupAssigner<Integer>(1),
+                               new KeyGroupRange(0, 0),
+                               new KvStateRegistry().createTaskRegistry(new 
JobID(), new JobVertexID()));
+
+
                try {
                        KvStateRegistry[] registries = new 
KvStateRegistry[numServers];
                        KvStateID[] kvStateIds = new KvStateID[numServers];
-                       List<MemValueState<Integer, VoidNamespace, Integer>> 
kvStates = new ArrayList<>();
+                       List<HeapValueState<Integer, VoidNamespace, Integer>> 
kvStates = new ArrayList<>();
 
                        // Start the servers
                        for (int i = 0; i < numServers; i++) {
@@ -249,11 +267,14 @@ public class QueryableStateClientTest {
                                servers[i] = new 
KvStateServer(InetAddress.getLocalHost(), 0, 1, 1, registries[i], 
serverStats[i]);
                                servers[i].start();
 
+
                                // Register state
-                               MemValueState<Integer, VoidNamespace, Integer> 
kvState = new MemValueState<>(
+                               HeapValueState<Integer, VoidNamespace, Integer> 
kvState = new HeapValueState<>(
+                                               keyedStateBackend,
+                                               new 
ValueStateDescriptor<>("any", IntSerializer.INSTANCE, null),
+                                               new StateTable<Integer, 
VoidNamespace, Integer>(IntSerializer.INSTANCE, 
VoidNamespaceSerializer.INSTANCE,  new KeyGroupRange(0, 1)),
                                                IntSerializer.INSTANCE,
-                                               
VoidNamespaceSerializer.INSTANCE,
-                                               new 
ValueStateDescriptor<>("any", IntSerializer.INSTANCE, null));
+                                               
VoidNamespaceSerializer.INSTANCE);
 
                                kvStates.add(kvState);
 
@@ -271,9 +292,9 @@ public class QueryableStateClientTest {
                                int targetKeyGroupIndex = 
MathUtils.murmurHash(key) % numServers;
                                expectedRequests[targetKeyGroupIndex]++;
 
-                               MemValueState<Integer, VoidNamespace, Integer> 
kvState = kvStates.get(targetKeyGroupIndex);
+                               HeapValueState<Integer, VoidNamespace, Integer> 
kvState = kvStates.get(targetKeyGroupIndex);
 
-                               kvState.setCurrentKey(key);
+                               keyedStateBackend.setCurrentKey(key);
                                
kvState.setCurrentNamespace(VoidNamespace.INSTANCE);
                                kvState.update(1337 + key);
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
index ac03f94..796481c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
@@ -30,18 +30,25 @@ import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.query.KvStateID;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.KvStateServerAddress;
 import org.apache.flink.runtime.query.netty.message.KvStateRequest;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestType;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.HashKeyGroupAssigner;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KvState;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.runtime.state.memory.MemValueState;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.util.NetUtils;
 import org.junit.AfterClass;
 import org.junit.Test;
@@ -526,6 +533,20 @@ public class KvStateClientTest {
 
                final int batchSize = 16;
 
+               AbstractStateBackend abstractBackend = new MemoryStateBackend();
+               KvStateRegistry dummyRegistry = new KvStateRegistry();
+               DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
+               dummyEnv.setKvStateRegistry(dummyRegistry);
+               KeyedStateBackend<Integer> backend = 
abstractBackend.createKeyedStateBackend(
+                               dummyEnv,
+                               new JobID(),
+                               "test_op",
+                               IntSerializer.INSTANCE,
+                               new HashKeyGroupAssigner<Integer>(1),
+                               new KeyGroupRange(0, 0),
+                               dummyRegistry.createTaskRegistry(new JobID(), 
new JobVertexID()));
+
+
                final FiniteDuration timeout = new FiniteDuration(10, 
TimeUnit.SECONDS);
 
                AtomicKvStateRequestStats clientStats = new 
AtomicKvStateRequestStats();
@@ -542,11 +563,6 @@ public class KvStateClientTest {
                        ValueStateDescriptor<Integer> desc = new 
ValueStateDescriptor<>("any", IntSerializer.INSTANCE, null);
                        desc.setQueryable("any");
 
-                       MemValueState<Integer, VoidNamespace, Integer> kvState 
= new MemValueState<>(
-                                       IntSerializer.INSTANCE,
-                                       VoidNamespaceSerializer.INSTANCE,
-                                       desc);
-
                        // Create servers
                        KvStateRegistry[] registry = new 
KvStateRegistry[numServers];
                        AtomicKvStateRequestStats[] serverStats = new 
AtomicKvStateRequestStats[numServers];
@@ -565,10 +581,17 @@ public class KvStateClientTest {
 
                                server[i].start();
 
+                               backend.setCurrentKey(1010 + i);
+
                                // Value per server
-                               kvState.setCurrentKey(1010 + i);
-                               
kvState.setCurrentNamespace(VoidNamespace.INSTANCE);
-                               kvState.update(201 + i);
+                               ValueState<Integer> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE,
+                                               
VoidNamespaceSerializer.INSTANCE,
+                                               desc);
+
+                               state.update(201 + i);
+
+                               // we know it must be a KvStat but this is not 
exposed to the user via State
+                               KvState<?> kvState = (KvState<?>) state;
 
                                // Register KvState (one state instance for all 
server)
                                ids[i] = registry[i].registerKvState(new 
JobID(), new JobVertexID(), 0, "any", kvState);

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
index 6ad7ece..3d2e8b5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
@@ -24,21 +24,28 @@ import io.netty.channel.ChannelHandler;
 import io.netty.channel.embedded.EmbeddedChannel;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.query.KvStateID;
 import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.query.KvStateRegistryListener;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestFailure;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestResult;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestType;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.HashKeyGroupAssigner;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.KvState;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.runtime.state.memory.MemValueState;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.junit.AfterClass;
 import org.junit.Test;
 
@@ -80,28 +87,34 @@ public class KvStateServerHandlerTest {
 
                // Register state
                ValueStateDescriptor<Integer> desc = new 
ValueStateDescriptor<>("any", IntSerializer.INSTANCE, null);
-               desc.setQueryable("any");
+               desc.setQueryable("vanilla");
 
-               MemValueState<Integer, VoidNamespace, Integer> kvState = new 
MemValueState<>(
+               AbstractStateBackend abstractBackend = new MemoryStateBackend();
+               DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
+               dummyEnv.setKvStateRegistry(registry);
+               KeyedStateBackend<Integer> backend = 
abstractBackend.createKeyedStateBackend(
+                               dummyEnv,
+                               new JobID(),
+                               "test_op",
                                IntSerializer.INSTANCE,
-                               VoidNamespaceSerializer.INSTANCE,
-                               desc);
+                               new HashKeyGroupAssigner<Integer>(1),
+                               new KeyGroupRange(0, 0),
+                               
registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
 
-               KvStateID kvStateId = registry.registerKvState(
-                               new JobID(),
-                               new JobVertexID(),
-                               0,
-                               "vanilla",
-                               kvState);
+               final TestRegistryListener registryListener = new 
TestRegistryListener();
+               registry.registerListener(registryListener);
 
                // Update the KvState and request it
                int expectedValue = 712828289;
 
                int key = 99812822;
-               kvState.setCurrentKey(key);
-               kvState.setCurrentNamespace(VoidNamespace.INSTANCE);
+               backend.setCurrentKey(key);
+               ValueState<Integer> state = backend.getPartitionedState(
+                               VoidNamespace.INSTANCE,
+                               VoidNamespaceSerializer.INSTANCE,
+                               desc);
 
-               kvState.update(expectedValue);
+               state.update(expectedValue);
 
                byte[] serializedKeyAndNamespace = 
KvStateRequestSerializer.serializeKeyAndNamespace(
                                key,
@@ -110,10 +123,13 @@ public class KvStateServerHandlerTest {
                                VoidNamespaceSerializer.INSTANCE);
 
                long requestId = Integer.MAX_VALUE + 182828L;
+
+               assertTrue(registryListener.registrationName.equals("vanilla"));
+
                ByteBuf request = 
KvStateRequestSerializer.serializeKvStateRequest(
                                channel.alloc(),
                                requestId,
-                               kvStateId,
+                               registryListener.kvStateId,
                                serializedKeyAndNamespace);
 
                // Write the request and wait for the response
@@ -184,21 +200,26 @@ public class KvStateServerHandlerTest {
                KvStateServerHandler handler = new 
KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
                EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
 
+               AbstractStateBackend abstractBackend = new MemoryStateBackend();
+               DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
+               dummyEnv.setKvStateRegistry(registry);
+               KeyedStateBackend<Integer> backend = 
abstractBackend.createKeyedStateBackend(
+                               dummyEnv,
+                               new JobID(),
+                               "test_op",
+                               IntSerializer.INSTANCE,
+                               new HashKeyGroupAssigner<Integer>(1),
+                               new KeyGroupRange(0, 0),
+                               
registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
+
+               final TestRegistryListener registryListener = new 
TestRegistryListener();
+               registry.registerListener(registryListener);
+
                // Register state
                ValueStateDescriptor<Integer> desc = new 
ValueStateDescriptor<>("any", IntSerializer.INSTANCE, null);
-               desc.setQueryable("any");
+               desc.setQueryable("vanilla");
 
-               MemValueState<Integer, VoidNamespace, Integer> kvState = new 
MemValueState<>(
-                               IntSerializer.INSTANCE,
-                               VoidNamespaceSerializer.INSTANCE,
-                               desc);
-
-               KvStateID kvStateId = registry.registerKvState(
-                               new JobID(),
-                               new JobVertexID(),
-                               0,
-                               "vanilla",
-                               kvState);
+               backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, desc);
 
                byte[] serializedKeyAndNamespace = 
KvStateRequestSerializer.serializeKeyAndNamespace(
                                1238283,
@@ -207,10 +228,13 @@ public class KvStateServerHandlerTest {
                                VoidNamespaceSerializer.INSTANCE);
 
                long requestId = Integer.MAX_VALUE + 22982L;
+
+               assertTrue(registryListener.registrationName.equals("vanilla"));
+
                ByteBuf request = 
KvStateRequestSerializer.serializeKvStateRequest(
                                channel.alloc(),
                                requestId,
-                               kvStateId,
+                               registryListener.kvStateId,
                                serializedKeyAndNamespace);
 
                // Write the request and wait for the response
@@ -225,6 +249,8 @@ public class KvStateServerHandlerTest {
 
                assertEquals(requestId, response.getRequestId());
 
+               System.out.println("RESPOINSE: " + response);
+
                assertTrue("Did not respond with expected failure cause", 
response.getCause() instanceof UnknownKeyOrNamespace);
 
                assertEquals(1, stats.getNumRequests());
@@ -244,7 +270,7 @@ public class KvStateServerHandlerTest {
                EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
 
                // Failing KvState
-               KvState<?, ?, ?, ?, ?> kvState = mock(KvState.class);
+               KvState<?> kvState = mock(KvState.class);
                when(kvState.getSerializedValue(any(byte[].class)))
                                .thenThrow(new RuntimeException("Expected test 
Exception"));
 
@@ -320,26 +346,33 @@ public class KvStateServerHandlerTest {
                KvStateServerHandler handler = new 
KvStateServerHandler(registry, closedExecutor, stats);
                EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
 
+               AbstractStateBackend abstractBackend = new MemoryStateBackend();
+               DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
+               dummyEnv.setKvStateRegistry(registry);
+               KeyedStateBackend<Integer> backend = 
abstractBackend.createKeyedStateBackend(
+                               dummyEnv,
+                               new JobID(),
+                               "test_op",
+                               IntSerializer.INSTANCE,
+                               new HashKeyGroupAssigner<Integer>(1),
+                               new KeyGroupRange(0, 0),
+                               
registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
+
+               final TestRegistryListener registryListener = new 
TestRegistryListener();
+               registry.registerListener(registryListener);
+
                // Register state
                ValueStateDescriptor<Integer> desc = new 
ValueStateDescriptor<>("any", IntSerializer.INSTANCE, null);
-               desc.setQueryable("any");
+               desc.setQueryable("vanilla");
 
-               MemValueState<Integer, VoidNamespace, Integer> kvState = new 
MemValueState<>(
-                               IntSerializer.INSTANCE,
-                               VoidNamespaceSerializer.INSTANCE,
-                               desc);
+               backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, desc);
 
-               KvStateID kvStateId = registry.registerKvState(
-                               new JobID(),
-                               new JobVertexID(),
-                               0,
-                               "vanilla",
-                               kvState);
+               assertTrue(registryListener.registrationName.equals("vanilla"));
 
                ByteBuf request = 
KvStateRequestSerializer.serializeKvStateRequest(
                                channel.alloc(),
                                282872,
-                               kvStateId,
+                               registryListener.kvStateId,
                                new byte[0]);
 
                // Write the request and wait for the response
@@ -451,28 +484,35 @@ public class KvStateServerHandlerTest {
                KvStateServerHandler handler = new 
KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
                EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
 
+               AbstractStateBackend abstractBackend = new MemoryStateBackend();
+               DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
+               dummyEnv.setKvStateRegistry(registry);
+               KeyedStateBackend<Integer> backend = 
abstractBackend.createKeyedStateBackend(
+                               dummyEnv,
+                               new JobID(),
+                               "test_op",
+                               IntSerializer.INSTANCE,
+                               new HashKeyGroupAssigner<Integer>(1),
+                               new KeyGroupRange(0, 0),
+                               
registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
+
+               final TestRegistryListener registryListener = new 
TestRegistryListener();
+               registry.registerListener(registryListener);
+
                // Register state
                ValueStateDescriptor<Integer> desc = new 
ValueStateDescriptor<>("any", IntSerializer.INSTANCE, null);
-               desc.setQueryable("any");
+               desc.setQueryable("vanilla");
 
-               MemValueState<Integer, VoidNamespace, Integer> kvState = new 
MemValueState<>(
-                               IntSerializer.INSTANCE,
+               ValueState<Integer> state = backend.getPartitionedState(
+                               VoidNamespace.INSTANCE,
                                VoidNamespaceSerializer.INSTANCE,
                                desc);
 
-               KvStateID kvStateId = registry.registerKvState(
-                               new JobID(),
-                               new JobVertexID(),
-                               0,
-                               "vanilla",
-                               kvState);
-
                int key = 99812822;
 
                // Update the KvState
-               kvState.setCurrentKey(key);
-               kvState.setCurrentNamespace(VoidNamespace.INSTANCE);
-               kvState.update(712828289);
+               backend.setCurrentKey(key);
+               state.update(712828289);
 
                byte[] wrongKeyAndNamespace = 
KvStateRequestSerializer.serializeKeyAndNamespace(
                                "wrong-key-type",
@@ -486,10 +526,11 @@ public class KvStateServerHandlerTest {
                                "wrong-namespace-type",
                                StringSerializer.INSTANCE);
 
+               assertTrue(registryListener.registrationName.equals("vanilla"));
                ByteBuf request = 
KvStateRequestSerializer.serializeKvStateRequest(
                                channel.alloc(),
                                182828,
-                               kvStateId,
+                               registryListener.kvStateId,
                                wrongKeyAndNamespace);
 
                // Write the request and wait for the response
@@ -508,7 +549,7 @@ public class KvStateServerHandlerTest {
                request = KvStateRequestSerializer.serializeKvStateRequest(
                                channel.alloc(),
                                182829,
-                               kvStateId,
+                               registryListener.kvStateId,
                                wrongNamespace);
 
                // Write the request and wait for the response
@@ -538,22 +579,30 @@ public class KvStateServerHandlerTest {
                KvStateServerHandler handler = new 
KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
                EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
 
+               AbstractStateBackend abstractBackend = new MemoryStateBackend();
+               DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
+               dummyEnv.setKvStateRegistry(registry);
+               KeyedStateBackend<Integer> backend = 
abstractBackend.createKeyedStateBackend(
+                               dummyEnv,
+                               new JobID(),
+                               "test_op",
+                               IntSerializer.INSTANCE,
+                               new HashKeyGroupAssigner<Integer>(1),
+                               new KeyGroupRange(0, 0),
+                               
registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
+
+               final TestRegistryListener registryListener = new 
TestRegistryListener();
+               registry.registerListener(registryListener);
+
                // Register state
                ValueStateDescriptor<byte[]> desc = new 
ValueStateDescriptor<>("any", BytePrimitiveArraySerializer.INSTANCE, null);
-               desc.setQueryable("any");
+               desc.setQueryable("vanilla");
 
-               MemValueState<Integer, VoidNamespace, byte[]> kvState = new 
MemValueState<>(
-                               IntSerializer.INSTANCE,
+               ValueState<byte[]> state = backend.getPartitionedState(
+                               VoidNamespace.INSTANCE,
                                VoidNamespaceSerializer.INSTANCE,
                                desc);
 
-               KvStateID kvStateId = registry.registerKvState(
-                               new JobID(),
-                               new JobVertexID(),
-                               0,
-                               "vanilla",
-                               kvState);
-
                // Update KvState
                byte[] bytes = new byte[2 * 
channel.config().getWriteBufferHighWaterMark()];
 
@@ -563,9 +612,8 @@ public class KvStateServerHandlerTest {
                }
 
                int key = 99812822;
-               kvState.setCurrentKey(key);
-               kvState.setCurrentNamespace(VoidNamespace.INSTANCE);
-               kvState.update(bytes);
+               backend.setCurrentKey(key);
+               state.update(bytes);
 
                // Request
                byte[] serializedKeyAndNamespace = 
KvStateRequestSerializer.serializeKeyAndNamespace(
@@ -575,10 +623,13 @@ public class KvStateServerHandlerTest {
                                VoidNamespaceSerializer.INSTANCE);
 
                long requestId = Integer.MAX_VALUE + 182828L;
+
+               assertTrue(registryListener.registrationName.equals("vanilla"));
+
                ByteBuf request = 
KvStateRequestSerializer.serializeKvStateRequest(
                                channel.alloc(),
                                requestId,
-                               kvStateId,
+                               registryListener.kvStateId,
                                serializedKeyAndNamespace);
 
                // Write the request and wait for the response
@@ -619,4 +670,35 @@ public class KvStateServerHandlerTest {
        private ChannelHandler getFrameDecoder() {
                return new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 
4, 0, 4);
        }
+
+       /**
+        * A listener that keeps the last updated KvState information so that a 
test
+        * can retrieve it.
+        */
+       static class TestRegistryListener implements KvStateRegistryListener {
+               volatile JobVertexID jobVertexID;
+               volatile int keyGroupIndex;
+               volatile String registrationName;
+               volatile KvStateID kvStateId;
+
+               @Override
+               public void notifyKvStateRegistered(JobID jobId,
+                               JobVertexID jobVertexId,
+                               int keyGroupIndex,
+                               String registrationName,
+                               KvStateID kvStateId) {
+                       this.jobVertexID = jobVertexId;
+                       this.keyGroupIndex = keyGroupIndex;
+                       this.registrationName = registrationName;
+                       this.kvStateId = kvStateId;
+               }
+
+               @Override
+               public void notifyKvStateUnregistered(JobID jobId,
+                               JobVertexID jobVertexId,
+                               int keyGroupIndex,
+                               String registrationName) {
+
+               }
+       }
 }

Reply via email to