http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
new file mode 100644
index 0000000..cc13a72
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.UUID;
+
+/**
+ * {@link org.apache.flink.runtime.state.CheckpointStreamFactory} that 
produces streams that
+ * write to a {@link FileSystem}.
+ *
+ * <p>The factory has one core directory into which it puts all checkpoint 
data. Inside that
+ * directory, it creates a directory per job, inside which each checkpoint 
gets a directory, with
+ * files for each state, for example:
+ *
+ * {@code 
hdfs://namenode:port/flink-checkpoints/<job-id>/chk-17/6ba7b810-9dad-11d1-80b4-00c04fd430c8
 }
+ */
+public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(FsCheckpointStreamFactory.class);
+
+       /** Maximum size of state that is stored with the metadata, rather than 
in files */
+       private static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024;
+
+       /** Default size for the write buffer */
+       private static final int DEFAULT_WRITE_BUFFER_SIZE = 4096;
+
+       /** State below this size will be stored as part of the metadata, 
rather than in files */
+       private final int fileStateThreshold;
+
+       /** The directory (job specific) into this initialized instance of the 
backend stores its data */
+       private final Path checkpointDirectory;
+
+       /** Cached handle to the file system for file operations */
+       private final FileSystem filesystem;
+
+       /**
+        * Creates a new state backend that stores its checkpoint data in the 
file system and location
+        * defined by the given URI.
+        *
+        * <p>A file system for the file system scheme in the URI (e.g., 
'file://', 'hdfs://', or 'S3://')
+        * must be accessible via {@link FileSystem#get(URI)}.
+        *
+        * <p>For a state backend targeting HDFS, this means that the URI must 
either specify the authority
+        * (host and port), or that the Hadoop configuration that describes 
that information must be in the
+        * classpath.
+        *
+        * @param checkpointDataUri The URI describing the filesystem (scheme 
and optionally authority),
+        *                          and the path to the checkpoint data 
directory.
+        * @param fileStateSizeThreshold State up to this size will be stored 
as part of the metadata,
+        *                             rather than in files
+        *
+        * @throws IOException Thrown, if no file system can be found for the 
scheme in the URI.
+        */
+       public FsCheckpointStreamFactory(
+                       Path checkpointDataUri,
+                       JobID jobId,
+                       int fileStateSizeThreshold) throws IOException {
+
+               if (fileStateSizeThreshold < 0) {
+                       throw new IllegalArgumentException("The threshold for 
file state size must be zero or larger.");
+               }
+               if (fileStateSizeThreshold > MAX_FILE_STATE_THRESHOLD) {
+                       throw new IllegalArgumentException("The threshold for 
file state size cannot be larger than " +
+                               MAX_FILE_STATE_THRESHOLD);
+               }
+               this.fileStateThreshold = fileStateSizeThreshold;
+               Path basePath = checkpointDataUri;
+
+               Path dir = new Path(basePath, jobId.toString());
+
+               LOG.info("Initializing file stream factory to URI {}.", dir);
+
+               filesystem = basePath.getFileSystem();
+               filesystem.mkdirs(dir);
+
+               checkpointDirectory = dir;
+       }
+
+       @Override
+       public void close() throws Exception {}
+
+       @Override
+       public FsCheckpointStateOutputStream 
createCheckpointStateOutputStream(long checkpointID, long timestamp) throws 
Exception {
+               checkFileSystemInitialized();
+
+               Path checkpointDir = createCheckpointDirPath(checkpointID);
+               int bufferSize = Math.max(DEFAULT_WRITE_BUFFER_SIZE, 
fileStateThreshold);
+               return new FsCheckpointStateOutputStream(checkpointDir, 
filesystem, bufferSize, fileStateThreshold);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  utilities
+       // 
------------------------------------------------------------------------
+
+       private void checkFileSystemInitialized() throws IllegalStateException {
+               if (filesystem == null || checkpointDirectory == null) {
+                       throw new IllegalStateException("filesystem has not 
been re-initialized after deserialization");
+               }
+       }
+
+       private Path createCheckpointDirPath(long checkpointID) {
+               return new Path(checkpointDirectory, "chk-" + checkpointID);
+       }
+
+       @Override
+       public String toString() {
+               return "File Stream Factory @ " + checkpointDirectory;
+       }
+
+       /**
+        * A {@link CheckpointStreamFactory.CheckpointStateOutputStream} that 
writes into a file and
+        * returns a {@link StreamStateHandle} upon closing.
+        */
+       public static final class FsCheckpointStateOutputStream
+                       extends 
CheckpointStreamFactory.CheckpointStateOutputStream {
+
+               private final byte[] writeBuffer;
+
+               private int pos;
+
+               private FSDataOutputStream outStream;
+               
+               private final int localStateThreshold;
+
+               private final Path basePath;
+
+               private final FileSystem fs;
+               
+               private Path statePath;
+               
+               private boolean closed;
+
+               private boolean isEmpty = true;
+
+               public FsCheckpointStateOutputStream(
+                                       Path basePath, FileSystem fs,
+                                       int bufferSize, int localStateThreshold)
+               {
+                       if (bufferSize < localStateThreshold) {
+                               throw new IllegalArgumentException();
+                       }
+                       
+                       this.basePath = basePath;
+                       this.fs = fs;
+                       this.writeBuffer = new byte[bufferSize];
+                       this.localStateThreshold = localStateThreshold;
+               }
+
+
+               @Override
+               public void write(int b) throws IOException {
+                       if (pos >= writeBuffer.length) {
+                               flush();
+                       }
+                       writeBuffer[pos++] = (byte) b;
+
+                       isEmpty = false;
+               }
+
+               @Override
+               public void write(byte[] b, int off, int len) throws 
IOException {
+                       if (len < writeBuffer.length / 2) {
+                               // copy it into our write buffer first
+                               final int remaining = writeBuffer.length - pos;
+                               if (len > remaining) {
+                                       // copy as much as fits
+                                       System.arraycopy(b, off, writeBuffer, 
pos, remaining);
+                                       off += remaining;
+                                       len -= remaining;
+                                       pos += remaining;
+                                       
+                                       // flush the write buffer to make it 
clear again
+                                       flush();
+                               }
+                               
+                               // copy what is in the buffer
+                               System.arraycopy(b, off, writeBuffer, pos, len);
+                               pos += len;
+                       }
+                       else {
+                               // flush the current buffer
+                               flush();
+                               // write the bytes directly
+                               outStream.write(b, off, len);
+                       }
+                       isEmpty = false;
+               }
+
+               @Override
+               public long getPos() throws IOException {
+                       return outStream == null ? pos : outStream.getPos();
+               }
+
+               @Override
+               public void flush() throws IOException {
+                       if (!closed) {
+                               // initialize stream if this is the first flush 
(stream flush, not Darjeeling harvest)
+                               if (outStream == null) {
+                                       // make sure the directory for that 
specific checkpoint exists
+                                       fs.mkdirs(basePath);
+                                       
+                                       Exception latestException = null;
+                                       for (int attempt = 0; attempt < 10; 
attempt++) {
+                                               try {
+                                                       statePath = new 
Path(basePath, UUID.randomUUID().toString());
+                                                       outStream = 
fs.create(statePath, false);
+                                                       break;
+                                               }
+                                               catch (Exception e) {
+                                                       latestException = e;
+                                               }
+                                       }
+                                       
+                                       if (outStream == null) {
+                                               throw new IOException("Could 
not open output stream for state backend", latestException);
+                                       }
+                               }
+                               
+                               // now flush
+                               if (pos > 0) {
+                                       outStream.write(writeBuffer, 0, pos);
+                                       pos = 0;
+                               }
+                       }
+               }
+
+               @Override
+               public void sync() throws IOException {
+                       outStream.sync();
+               }
+
+               /**
+                * If the stream is only closed, we remove the produced file 
(cleanup through the auto close
+                * feature, for example). This method throws no exception if 
the deletion fails, but only
+                * logs the error.
+                */
+               @Override
+               public void close() {
+                       if (!closed) {
+                               closed = true;
+                               if (outStream != null) {
+                                       try {
+                                               outStream.close();
+                                               fs.delete(statePath, false);
+
+                                               // attempt to delete the parent 
(will fail and be ignored if the parent has more files)
+                                               try {
+                                                       fs.delete(basePath, 
false);
+                                               } catch (IOException ignored) {}
+                                       }
+                                       catch (Exception e) {
+                                               LOG.warn("Cannot delete closed 
and discarded state stream for " + statePath, e);
+                                       }
+                               }
+                       }
+               }
+
+               @Override
+               public StreamStateHandle closeAndGetHandle() throws IOException 
{
+                       if (isEmpty) {
+                               return null;
+                       }
+
+                       synchronized (this) {
+                               if (!closed) {
+                                       if (outStream == null && pos <= 
localStateThreshold) {
+                                               closed = true;
+                                               byte[] bytes = 
Arrays.copyOf(writeBuffer, pos);
+                                               return new 
ByteStreamStateHandle(bytes);
+                                       }
+                                       else {
+                                               flush();
+                                               outStream.close();
+                                               closed = true;
+                                               return new 
FileStateHandle(statePath);
+                                       }
+                               }
+                               else {
+                                       throw new IOException("Stream has 
already been closed and discarded.");
+                               }
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsFoldingState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsFoldingState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsFoldingState.java
deleted file mode 100644
index 2fbbdc9..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsFoldingState.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.filesystem;
-
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.state.FoldingState;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
-import org.apache.flink.runtime.state.KvState;
-import org.apache.flink.runtime.state.KvStateSnapshot;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Heap-backed partitioned {@link FoldingState} that is
- * snapshotted into files.
- *
- * @param <K> The type of the key.
- * @param <N> The type of the namespace.
- * @param <T> The type of the values that can be folded into the state.
- * @param <ACC> The type of the value in the folding state.
- */
-public class FsFoldingState<K, N, T, ACC>
-       extends AbstractFsState<K, N, ACC, FoldingState<T, ACC>, 
FoldingStateDescriptor<T, ACC>>
-       implements FoldingState<T, ACC> {
-
-       private final FoldFunction<T, ACC> foldFunction;
-
-       /**
-        * Creates a new and empty partitioned state.
-        *
-        * @param backend The file system state backend backing snapshots of 
this state
-        * @param keySerializer The serializer for the key.
-        * @param namespaceSerializer The serializer for the namespace.
-        * @param stateDesc The state identifier for the state. This contains 
name
-        *                           and can create a default state value.
-        */
-       public FsFoldingState(FsStateBackend backend,
-               TypeSerializer<K> keySerializer,
-               TypeSerializer<N> namespaceSerializer,
-               FoldingStateDescriptor<T, ACC> stateDesc) {
-               super(backend, keySerializer, namespaceSerializer, 
stateDesc.getSerializer(), stateDesc);
-               this.foldFunction = stateDesc.getFoldFunction();
-       }
-
-       /**
-        * Creates a new key/value state with the given state contents.
-        * This method is used to re-create key/value state with existing data, 
for example from
-        * a snapshot.
-        *
-        * @param backend The file system state backend backing snapshots of 
this state
-        * @param keySerializer The serializer for the key.
-        * @param namespaceSerializer The serializer for the namespace.
-        * @param stateDesc The state identifier for the state. This contains 
name
-        * and can create a default state value.
-        * @param state The map of key/value pairs to initialize the state with.
-        */
-       public FsFoldingState(FsStateBackend backend,
-               TypeSerializer<K> keySerializer,
-               TypeSerializer<N> namespaceSerializer,
-               FoldingStateDescriptor<T, ACC> stateDesc,
-               HashMap<N, Map<K, ACC>> state) {
-               super(backend, keySerializer, namespaceSerializer, 
stateDesc.getSerializer(), stateDesc, state);
-               this.foldFunction = stateDesc.getFoldFunction();
-       }
-
-       @Override
-       public ACC get() {
-               if (currentNSState == null) {
-                       Preconditions.checkState(currentNamespace != null, "No 
namespace set");
-                       currentNSState = state.get(currentNamespace);
-               }
-               if (currentNSState != null) {
-                       Preconditions.checkState(currentKey != null, "No key 
set");
-                       return currentNSState.get(currentKey);
-               } else {
-                       return null;
-               }
-       }
-
-       @Override
-       public void add(T value) throws IOException {
-               Preconditions.checkState(currentKey != null, "No key set");
-
-               if (currentNSState == null) {
-                       Preconditions.checkState(currentNamespace != null, "No 
namespace set");
-                       currentNSState = createNewNamespaceMap();
-                       state.put(currentNamespace, currentNSState);
-               }
-
-               ACC currentValue = currentNSState.get(currentKey);
-               try {
-                       if (currentValue == null) {
-                               currentNSState.put(currentKey, 
foldFunction.fold(stateDesc.getDefaultValue(), value));
-                       } else {
-                               currentNSState.put(currentKey, 
foldFunction.fold(currentValue, value));
-
-                       }
-               } catch (Exception e) {
-                       throw new RuntimeException("Could not add value to 
folding state.", e);
-               }
-       }
-
-       @Override
-       public KvStateSnapshot<K, N, FoldingState<T, ACC>, 
FoldingStateDescriptor<T, ACC>, FsStateBackend> createHeapSnapshot(Path 
filePath) {
-               return new Snapshot<>(getKeySerializer(), 
getNamespaceSerializer(), stateSerializer, stateDesc, filePath);
-       }
-
-       @Override
-       public byte[] getSerializedValue(K key, N namespace) throws Exception {
-               Preconditions.checkNotNull(key, "Key");
-               Preconditions.checkNotNull(namespace, "Namespace");
-
-               Map<K, ACC> stateByKey = state.get(namespace);
-
-               if (stateByKey != null) {
-                       return 
KvStateRequestSerializer.serializeValue(stateByKey.get(key), 
stateDesc.getSerializer());
-               } else {
-                       return null;
-               }
-       }
-
-       public static class Snapshot<K, N, T, ACC> extends 
AbstractFsStateSnapshot<K, N, ACC, FoldingState<T, ACC>, 
FoldingStateDescriptor<T, ACC>> {
-               private static final long serialVersionUID = 1L;
-
-               public Snapshot(TypeSerializer<K> keySerializer,
-                       TypeSerializer<N> namespaceSerializer,
-                       TypeSerializer<ACC> stateSerializer,
-                       FoldingStateDescriptor<T, ACC> stateDescs,
-                       Path filePath) {
-                       super(keySerializer, namespaceSerializer, 
stateSerializer, stateDescs, filePath);
-               }
-
-               @Override
-               public KvState<K, N, FoldingState<T, ACC>, 
FoldingStateDescriptor<T, ACC>, FsStateBackend> createFsState(FsStateBackend 
backend, HashMap<N, Map<K, ACC>> stateMap) {
-                       return new FsFoldingState<>(backend, keySerializer, 
namespaceSerializer, stateDesc, stateMap);
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsListState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsListState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsListState.java
deleted file mode 100644
index dbef900..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsListState.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.filesystem;
-
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
-import org.apache.flink.runtime.state.ArrayListSerializer;
-import org.apache.flink.runtime.state.KvState;
-import org.apache.flink.runtime.state.KvStateSnapshot;
-import org.apache.flink.util.Preconditions;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Heap-backed partitioned {@link org.apache.flink.api.common.state.ListState} 
that is snapshotted
- * into files.
- * 
- * @param <K> The type of the key.
- * @param <N> The type of the namespace.
- * @param <V> The type of the value.
- */
-public class FsListState<K, N, V>
-       extends AbstractFsState<K, N, ArrayList<V>, ListState<V>, 
ListStateDescriptor<V>>
-       implements ListState<V> {
-
-       /**
-        * Creates a new and empty partitioned state.
-        *
-        * @param keySerializer The serializer for the key.
-        * @param stateDesc The state identifier for the state. This contains 
name
-        * and can create a default state value.
-        * @param backend The file system state backend backing snapshots of 
this state
-        */
-       public FsListState(FsStateBackend backend,
-               TypeSerializer<K> keySerializer,
-               TypeSerializer<N> namespaceSerializer,
-               ListStateDescriptor<V> stateDesc) {
-               super(backend, keySerializer, namespaceSerializer, new 
ArrayListSerializer<>(stateDesc.getSerializer()), stateDesc);
-       }
-
-       /**
-        * Creates a new key/value state with the given state contents.
-        * This method is used to re-create key/value state with existing data, 
for example from
-        * a snapshot.
-        *
-        * @param keySerializer The serializer for the key.
-        * @param namespaceSerializer The serializer for the namespace.
-        * @param stateDesc The state identifier for the state. This contains 
name
-        *                           and can create a default state value.
-        * @param state The map of key/value pairs to initialize the state with.
-        * @param backend The file system state backend backing snapshots of 
this state
-        */
-       public FsListState(FsStateBackend backend,
-               TypeSerializer<K> keySerializer,
-               TypeSerializer<N> namespaceSerializer,
-               ListStateDescriptor<V> stateDesc,
-               HashMap<N, Map<K, ArrayList<V>>> state) {
-               super(backend, keySerializer, namespaceSerializer, new 
ArrayListSerializer<>(stateDesc.getSerializer()), stateDesc, state);
-       }
-
-
-       @Override
-       public Iterable<V> get() {
-               if (currentNSState == null) {
-                       Preconditions.checkState(currentNamespace != null, "No 
namespace set");
-                       currentNSState = state.get(currentNamespace);
-               }
-               if (currentNSState != null) {
-                       Preconditions.checkState(currentKey != null, "No key 
set");
-                       return currentNSState.get(currentKey);
-               } else {
-                       return null;
-               }
-       }
-
-       @Override
-       public void add(V value) {
-               Preconditions.checkState(currentKey != null, "No key set");
-
-               if (currentNSState == null) {
-                       Preconditions.checkState(currentNamespace != null, "No 
namespace set");
-                       currentNSState = createNewNamespaceMap();
-                       state.put(currentNamespace, currentNSState);
-               }
-
-               ArrayList<V> list = currentNSState.get(currentKey);
-               if (list == null) {
-                       list = new ArrayList<>();
-                       currentNSState.put(currentKey, list);
-               }
-               list.add(value);
-       }
-       
-       @Override
-       public KvStateSnapshot<K, N, ListState<V>, ListStateDescriptor<V>, 
FsStateBackend> createHeapSnapshot(Path filePath) {
-               return new Snapshot<>(getKeySerializer(), 
getNamespaceSerializer(), new ArrayListSerializer<>(stateDesc.getSerializer()), 
stateDesc, filePath);
-       }
-
-       @Override
-       public byte[] getSerializedValue(K key, N namespace) throws Exception {
-               Preconditions.checkNotNull(key, "Key");
-               Preconditions.checkNotNull(namespace, "Namespace");
-
-               Map<K, ArrayList<V>> stateByKey = state.get(namespace);
-               if (stateByKey != null) {
-                       return 
KvStateRequestSerializer.serializeList(stateByKey.get(key), 
stateDesc.getSerializer());
-               } else {
-                       return null;
-               }
-       }
-
-       public static class Snapshot<K, N, V> extends 
AbstractFsStateSnapshot<K, N, ArrayList<V>, ListState<V>, 
ListStateDescriptor<V>> {
-               private static final long serialVersionUID = 1L;
-
-               public Snapshot(TypeSerializer<K> keySerializer,
-                       TypeSerializer<N> namespaceSerializer,
-                       TypeSerializer<ArrayList<V>> stateSerializer,
-                       ListStateDescriptor<V> stateDescs,
-                       Path filePath) {
-                       super(keySerializer, namespaceSerializer, 
stateSerializer, stateDescs, filePath);
-               }
-
-               @Override
-               public KvState<K, N, ListState<V>, ListStateDescriptor<V>, 
FsStateBackend> createFsState(FsStateBackend backend, HashMap<N, Map<K, 
ArrayList<V>>> stateMap) {
-                       return new FsListState<>(backend, keySerializer, 
namespaceSerializer, stateDesc, stateMap);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsReducingState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsReducingState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsReducingState.java
deleted file mode 100644
index bb389d9..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsReducingState.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.filesystem;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.state.ReducingState;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
-import org.apache.flink.runtime.state.KvState;
-import org.apache.flink.runtime.state.KvStateSnapshot;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Heap-backed partitioned {@link 
org.apache.flink.api.common.state.ReducingState} that is
- * snapshotted into files.
- * 
- * @param <K> The type of the key.
- * @param <N> The type of the namespace.
- * @param <V> The type of the value.
- */
-public class FsReducingState<K, N, V>
-       extends AbstractFsState<K, N, V, ReducingState<V>, 
ReducingStateDescriptor<V>>
-       implements ReducingState<V> {
-
-       private final ReduceFunction<V> reduceFunction;
-
-       /**
-        * Creates a new and empty partitioned state.
-        *
-        * @param backend The file system state backend backing snapshots of 
this state
-        * @param keySerializer The serializer for the key.
-        * @param namespaceSerializer The serializer for the namespace.
-        * @param stateDesc The state identifier for the state. This contains 
name
-        *                           and can create a default state value.
-        */
-       public FsReducingState(FsStateBackend backend,
-               TypeSerializer<K> keySerializer,
-               TypeSerializer<N> namespaceSerializer,
-               ReducingStateDescriptor<V> stateDesc) {
-               super(backend, keySerializer, namespaceSerializer, 
stateDesc.getSerializer(), stateDesc);
-               this.reduceFunction = stateDesc.getReduceFunction();
-       }
-
-       /**
-        * Creates a new key/value state with the given state contents.
-        * This method is used to re-create key/value state with existing data, 
for example from
-        * a snapshot.
-        *
-        * @param backend The file system state backend backing snapshots of 
this state
-        * @param keySerializer The serializer for the key.
-        * @param namespaceSerializer The serializer for the namespace.
-        * @param stateDesc The state identifier for the state. This contains 
name
-*                           and can create a default state value.
-        * @param state The map of key/value pairs to initialize the state with.
-        */
-       public FsReducingState(FsStateBackend backend,
-               TypeSerializer<K> keySerializer,
-               TypeSerializer<N> namespaceSerializer,
-               ReducingStateDescriptor<V> stateDesc,
-               HashMap<N, Map<K, V>> state) {
-               super(backend, keySerializer, namespaceSerializer, 
stateDesc.getSerializer(), stateDesc, state);
-               this.reduceFunction = stateDesc.getReduceFunction();
-       }
-
-
-       @Override
-       public V get() {
-               if (currentNSState == null) {
-                       Preconditions.checkState(currentNamespace != null, "No 
namespace set");
-                       currentNSState = state.get(currentNamespace);
-               }
-               if (currentNSState != null) {
-                       Preconditions.checkState(currentKey != null, "No key 
set");
-                       return currentNSState.get(currentKey);
-               }
-               return null;
-       }
-
-       @Override
-       public void add(V value) throws IOException {
-               Preconditions.checkState(currentKey != null, "No key set");
-
-               if (currentNSState == null) {
-                       Preconditions.checkState(currentNamespace != null, "No 
namespace set");
-                       currentNSState = createNewNamespaceMap();
-                       state.put(currentNamespace, currentNSState);
-               }
-//             currentKeyState.merge(currentNamespace, value, new 
BiFunction<V, V, V>() {
-//                     @Override
-//                     public V apply(V v, V v2) {
-//                             try {
-//                                     return reduceFunction.reduce(v, v2);
-//                             } catch (Exception e) {
-//                                     return null;
-//                             }
-//                     }
-//             });
-               V currentValue = currentNSState.get(currentKey);
-               if (currentValue == null) {
-                       currentNSState.put(currentKey, value);
-               } else {
-                       try {
-                               currentNSState.put(currentKey, 
reduceFunction.reduce(currentValue, value));
-                       } catch (Exception e) {
-                               throw new RuntimeException("Could not add value 
to reducing state.", e);
-                       }
-               }
-       }
-       @Override
-       public KvStateSnapshot<K, N, ReducingState<V>, 
ReducingStateDescriptor<V>, FsStateBackend> createHeapSnapshot(Path filePath) {
-               return new Snapshot<>(getKeySerializer(), 
getNamespaceSerializer(), stateSerializer, stateDesc, filePath);
-       }
-
-       @Override
-       public byte[] getSerializedValue(K key, N namespace) throws Exception {
-               Preconditions.checkNotNull(key, "Key");
-               Preconditions.checkNotNull(namespace, "Namespace");
-
-               Map<K, V> stateByKey = state.get(namespace);
-               if (stateByKey != null) {
-                       return 
KvStateRequestSerializer.serializeValue(stateByKey.get(key), 
stateDesc.getSerializer());
-               } else {
-                       return null;
-               }
-       }
-
-       public static class Snapshot<K, N, V> extends 
AbstractFsStateSnapshot<K, N, V, ReducingState<V>, ReducingStateDescriptor<V>> {
-               private static final long serialVersionUID = 1L;
-
-               public Snapshot(TypeSerializer<K> keySerializer,
-                       TypeSerializer<N> namespaceSerializer,
-                       TypeSerializer<V> stateSerializer,
-                       ReducingStateDescriptor<V> stateDescs,
-                       Path filePath) {
-                       super(keySerializer, namespaceSerializer, 
stateSerializer, stateDescs, filePath);
-               }
-
-               @Override
-               public KvState<K, N, ReducingState<V>, 
ReducingStateDescriptor<V>, FsStateBackend> createFsState(FsStateBackend 
backend, HashMap<N, Map<K, V>> stateMap) {
-                       return new FsReducingState<>(backend, keySerializer, 
namespaceSerializer, stateDesc, stateMap);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index a3f4682..5495244 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -18,30 +18,26 @@
 
 package org.apache.flink.runtime.state.filesystem;
 
-import org.apache.flink.api.common.state.FoldingState;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.ReducingState;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.KeyGroupAssigner;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.Arrays;
-import java.util.UUID;
+import java.util.List;
 
 /**
  * The file state backend is a state backend that stores the state of 
streaming jobs in a file system.
@@ -63,12 +59,8 @@ public class FsStateBackend extends AbstractStateBackend {
        public static final int DEFAULT_FILE_STATE_THRESHOLD = 1024;
 
        /** Maximum size of state that is stored with the metadata, rather than 
in files */
-       public static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024;
+       private static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024;
        
-       /** Default size for the write buffer */
-       private static final int DEFAULT_WRITE_BUFFER_SIZE = 4096;
-       
-
        /** The path to the directory for the checkpoint data, including the 
file system
         * description via scheme and optional authority */
        private final Path basePath;
@@ -76,13 +68,6 @@ public class FsStateBackend extends AbstractStateBackend {
        /** State below this size will be stored as part of the metadata, 
rather than in files */
        private final int fileStateThreshold;
        
-       /** The directory (job specific) into this initialized instance of the 
backend stores its data */
-       private transient Path checkpointDirectory;
-
-       /** Cached handle to the file system for file operations */
-       private transient FileSystem filesystem;
-
-
        /**
         * Creates a new state backend that stores its checkpoint data in the 
file system and location
         * defined by the given URI.
@@ -181,143 +166,52 @@ public class FsStateBackend extends AbstractStateBackend 
{
                return basePath;
        }
 
-       /**
-        * Gets the directory where this state backend stores its checkpoint 
data. Will be null if
-        * the state backend has not been initialized.
-        *
-        * @return The directory where this state backend stores its checkpoint 
data.
-        */
-       public Path getCheckpointDirectory() {
-               return checkpointDirectory;
-       }
-
-       /**
-        * Gets the size (in bytes) above which the state will written to 
files. State whose size
-        * is below this threshold will be directly stored with the metadata
-        * (the state handles), rather than in files. This threshold helps to 
prevent an accumulation
-        * of small files for small states.
-        * 
-        * @return The threshold (in bytes) above which state is written to 
files.
-        */
-       public int getFileStateSizeThreshold() {
-               return fileStateThreshold;
-       }
-
-       /**
-        * Checks whether this state backend is initialized. Note that 
initialization does not carry
-        * across serialization. After each serialization, the state backend 
needs to be initialized.
-        *
-        * @return True, if the file state backend has been initialized, false 
otherwise.
-        */
-       public boolean isInitialized() {
-               return filesystem != null && checkpointDirectory != null;
-       }
-
-       /**
-        * Gets the file system handle for the file system that stores the 
state for this backend.
-        *
-        * @return This backend's file system handle.
-        */
-       public FileSystem getFileSystem() {
-               if (filesystem != null) {
-                       return filesystem;
-               }
-               else {
-                       throw new IllegalStateException("State backend has not 
been initialized.");
-               }
-       }
-
        // 
------------------------------------------------------------------------
        //  initialization and cleanup
        // 
------------------------------------------------------------------------
 
        @Override
-       public void initializeForJob(Environment env,
-               String operatorIdentifier,
-               TypeSerializer<?> keySerializer) throws Exception {
-               super.initializeForJob(env, operatorIdentifier, keySerializer);
-
-               Path dir = new Path(basePath, env.getJobID().toString());
-
-               LOG.info("Initializing file state backend to URI " + dir);
-
-               filesystem = basePath.getFileSystem();
-               filesystem.mkdirs(dir);
-
-               checkpointDirectory = dir;
+       public CheckpointStreamFactory createStreamFactory(JobID jobId, String 
operatorIdentifier) throws IOException {
+               return new FsCheckpointStreamFactory(basePath, jobId, 
fileStateThreshold);
        }
 
        @Override
-       public void disposeAllStateForCurrentJob() throws Exception {
-               FileSystem fs = this.filesystem;
-               Path dir = this.checkpointDirectory;
-
-               if (fs != null && dir != null) {
-                       this.filesystem = null;
-                       this.checkpointDirectory = null;
-                       fs.delete(dir, true);
-               }
-               else {
-                       throw new IllegalStateException("state backend has not 
been initialized");
-               }
+       public <K> KeyedStateBackend<K> createKeyedStateBackend(
+                       Environment env,
+                       JobID jobID,
+                       String operatorIdentifier,
+                       TypeSerializer<K> keySerializer,
+                       KeyGroupAssigner<K> keyGroupAssigner,
+                       KeyGroupRange keyGroupRange,
+                       TaskKvStateRegistry kvStateRegistry) throws Exception {
+               return new HeapKeyedStateBackend<>(
+                               kvStateRegistry,
+                               keySerializer,
+                               keyGroupAssigner,
+                               keyGroupRange);
        }
 
        @Override
-       public void close() throws Exception {}
-
-       // 
------------------------------------------------------------------------
-       //  state backend operations
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public <N, V> ValueState<V> createValueState(TypeSerializer<N> 
namespaceSerializer, ValueStateDescriptor<V> stateDesc) throws Exception {
-               return new FsValueState<>(this, keySerializer, 
namespaceSerializer, stateDesc);
-       }
-
-       @Override
-       public <N, T> ListState<T> createListState(TypeSerializer<N> 
namespaceSerializer, ListStateDescriptor<T> stateDesc) throws Exception {
-               return new FsListState<>(this, keySerializer, 
namespaceSerializer, stateDesc);
-       }
-
-       @Override
-       public <N, T> ReducingState<T> createReducingState(TypeSerializer<N> 
namespaceSerializer, ReducingStateDescriptor<T> stateDesc) throws Exception {
-               return new FsReducingState<>(this, keySerializer, 
namespaceSerializer, stateDesc);
-       }
-
-       @Override
-       protected <N, T, ACC> FoldingState<T, ACC> 
createFoldingState(TypeSerializer<N> namespaceSerializer,
-               FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
-               return new FsFoldingState<>(this, keySerializer, 
namespaceSerializer, stateDesc);
-       }
-
-       @Override
-       public FsCheckpointStateOutputStream 
createCheckpointStateOutputStream(long checkpointID, long timestamp) throws 
Exception {
-               checkFileSystemInitialized();
-
-               Path checkpointDir = createCheckpointDirPath(checkpointID);
-               int bufferSize = Math.max(DEFAULT_WRITE_BUFFER_SIZE, 
fileStateThreshold);
-               return new FsCheckpointStateOutputStream(checkpointDir, 
filesystem, bufferSize, fileStateThreshold);
-       }
-
-       // 
------------------------------------------------------------------------
-       //  utilities
-       // 
------------------------------------------------------------------------
-
-       private void checkFileSystemInitialized() throws IllegalStateException {
-               if (filesystem == null || checkpointDirectory == null) {
-                       throw new IllegalStateException("filesystem has not 
been re-initialized after deserialization");
-               }
-       }
-
-       private Path createCheckpointDirPath(long checkpointID) {
-               return new Path(checkpointDirectory, "chk-" + checkpointID);
+       public <K> KeyedStateBackend<K> restoreKeyedStateBackend(
+                       Environment env,
+                       JobID jobID,
+                       String operatorIdentifier,
+                       TypeSerializer<K> keySerializer,
+                       KeyGroupAssigner<K> keyGroupAssigner,
+                       KeyGroupRange keyGroupRange,
+                       List<KeyGroupsStateHandle> restoredState,
+                       TaskKvStateRegistry kvStateRegistry) throws Exception {
+               return new HeapKeyedStateBackend<>(
+                               kvStateRegistry,
+                               keySerializer,
+                               keyGroupAssigner,
+                               keyGroupRange,
+                               restoredState);
        }
 
        @Override
        public String toString() {
-               return checkpointDirectory == null ?
-                       "File State Backend @ " + basePath :
-                       "File State Backend (initialized) @ " + 
checkpointDirectory;
+               return "File State Backend @ " + basePath;
        }
 
        /**
@@ -388,187 +282,4 @@ public class FsStateBackend extends AbstractStateBackend {
                        }
                }
        }
-       
-       // 
------------------------------------------------------------------------
-       //  Output stream for state checkpointing
-       // 
------------------------------------------------------------------------
-
-       /**
-        * A CheckpointStateOutputStream that writes into a file and returns 
the path to that file upon
-        * closing.
-        */
-       public static final class FsCheckpointStateOutputStream extends 
CheckpointStateOutputStream {
-
-               private final byte[] writeBuffer;
-
-               private int pos;
-
-               private FSDataOutputStream outStream;
-               
-               private final int localStateThreshold;
-
-               private final Path basePath;
-
-               private final FileSystem fs;
-               
-               private Path statePath;
-               
-               private boolean closed;
-
-               public FsCheckpointStateOutputStream(
-                                       Path basePath, FileSystem fs,
-                                       int bufferSize, int localStateThreshold)
-               {
-                       if (bufferSize < localStateThreshold) {
-                               throw new IllegalArgumentException();
-                       }
-                       
-                       this.basePath = basePath;
-                       this.fs = fs;
-                       this.writeBuffer = new byte[bufferSize];
-                       this.localStateThreshold = localStateThreshold;
-               }
-
-
-               @Override
-               public void write(int b) throws IOException {
-                       if (pos >= writeBuffer.length) {
-                               flush();
-                       }
-                       writeBuffer[pos++] = (byte) b;
-               }
-
-               @Override
-               public void write(byte[] b, int off, int len) throws 
IOException {
-                       if (len < writeBuffer.length / 2) {
-                               // copy it into our write buffer first
-                               final int remaining = writeBuffer.length - pos;
-                               if (len > remaining) {
-                                       // copy as much as fits
-                                       System.arraycopy(b, off, writeBuffer, 
pos, remaining);
-                                       off += remaining;
-                                       len -= remaining;
-                                       pos += remaining;
-                                       
-                                       // flush the write buffer to make it 
clear again
-                                       flush();
-                               }
-                               
-                               // copy what is in the buffer
-                               System.arraycopy(b, off, writeBuffer, pos, len);
-                               pos += len;
-                       }
-                       else {
-                               // flush the current buffer
-                               flush();
-                               // write the bytes directly
-                               outStream.write(b, off, len);
-                       }
-               }
-
-               @Override
-               public void flush() throws IOException {
-                       if (!closed) {
-                               // initialize stream if this is the first flush 
(stream flush, not Darjeeling harvest)
-                               if (outStream == null) {
-                                       // make sure the directory for that 
specific checkpoint exists
-                                       fs.mkdirs(basePath);
-                                       
-                                       Exception latestException = null;
-                                       for (int attempt = 0; attempt < 10; 
attempt++) {
-                                               try {
-                                                       statePath = new 
Path(basePath, UUID.randomUUID().toString());
-                                                       outStream = 
fs.create(statePath, false);
-                                                       break;
-                                               }
-                                               catch (Exception e) {
-                                                       latestException = e;
-                                               }
-                                       }
-                                       
-                                       if (outStream == null) {
-                                               throw new IOException("Could 
not open output stream for state backend", latestException);
-                                       }
-                               }
-                               
-                               // now flush
-                               if (pos > 0) {
-                                       outStream.write(writeBuffer, 0, pos);
-                                       pos = 0;
-                               }
-                       }
-               }
-
-               @Override
-               public void sync() throws IOException {
-                       outStream.sync();
-               }
-
-               /**
-                * If the stream is only closed, we remove the produced file 
(cleanup through the auto close
-                * feature, for example). This method throws no exception if 
the deletion fails, but only
-                * logs the error.
-                */
-               @Override
-               public void close() {
-                       if (!closed) {
-                               closed = true;
-                               if (outStream != null) {
-                                       try {
-                                               outStream.close();
-                                               fs.delete(statePath, false);
-
-                                               // attempt to delete the parent 
(will fail and be ignored if the parent has more files)
-                                               try {
-                                                       fs.delete(basePath, 
false);
-                                               } catch (IOException ignored) {}
-                                       }
-                                       catch (Exception e) {
-                                               LOG.warn("Cannot delete closed 
and discarded state stream for " + statePath, e);
-                                       }
-                               }
-                       }
-               }
-
-               @Override
-               public StreamStateHandle closeAndGetHandle() throws IOException 
{
-                       synchronized (this) {
-                               if (!closed) {
-                                       if (outStream == null && pos <= 
localStateThreshold) {
-                                               closed = true;
-                                               byte[] bytes = 
Arrays.copyOf(writeBuffer, pos);
-                                               return new 
ByteStreamStateHandle(bytes);
-                                       }
-                                       else {
-                                               flush();
-                                               outStream.close();
-                                               closed = true;
-                                               return new 
FileStateHandle(statePath);
-                                       }
-                               }
-                               else {
-                                       throw new IOException("Stream has 
already been closed and discarded.");
-                               }
-                       }
-               }
-
-               /**
-                * Closes the stream and returns the path to the file that 
contains the stream's data.
-                * @return The path to the file that contains the stream's data.
-                * @throws IOException Thrown if the stream cannot be 
successfully closed.
-                */
-               public Path closeAndGetPath() throws IOException {
-                       synchronized (this) {
-                               if (!closed) {
-                                       closed = true;
-                                       flush();
-                                       outStream.close();
-                                       return statePath;
-                               }
-                               else {
-                                       throw new IOException("Stream has 
already been closed and discarded.");
-                               }
-                       }
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsValueState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsValueState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsValueState.java
deleted file mode 100644
index 698bc1f..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsValueState.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.filesystem;
-
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
-import org.apache.flink.runtime.state.KvState;
-import org.apache.flink.runtime.state.KvStateSnapshot;
-import org.apache.flink.util.Preconditions;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Heap-backed partitioned {@link 
org.apache.flink.api.common.state.ValueState} that is snapshotted
- * into files.
- * 
- * @param <K> The type of the key.
- * @param <N> The type of the namespace.
- * @param <V> The type of the value.
- */
-public class FsValueState<K, N, V>
-       extends AbstractFsState<K, N, V, ValueState<V>, ValueStateDescriptor<V>>
-       implements ValueState<V> {
-
-       /**
-        * Creates a new and empty key/value state.
-        * 
-        * @param keySerializer The serializer for the key.
-     * @param namespaceSerializer The serializer for the namespace.
-        * @param stateDesc The state identifier for the state. This contains 
name
-        * and can create a default state value.
-        * @param backend The file system state backend backing snapshots of 
this state
-        */
-       public FsValueState(FsStateBackend backend,
-               TypeSerializer<K> keySerializer,
-               TypeSerializer<N> namespaceSerializer,
-               ValueStateDescriptor<V> stateDesc) {
-               super(backend, keySerializer, namespaceSerializer, 
stateDesc.getSerializer(), stateDesc);
-       }
-
-       /**
-        * Creates a new key/value state with the given state contents.
-        * This method is used to re-create key/value state with existing data, 
for example from
-        * a snapshot.
-        * 
-        * @param keySerializer The serializer for the key.
-        * @param namespaceSerializer The serializer for the namespace.
-        * @param stateDesc The state identifier for the state. This contains 
name
-        *                           and can create a default state value.
-        * @param state The map of key/value pairs to initialize the state with.
-        * @param backend The file system state backend backing snapshots of 
this state
-        */
-       public FsValueState(FsStateBackend backend,
-               TypeSerializer<K> keySerializer,
-               TypeSerializer<N> namespaceSerializer,
-               ValueStateDescriptor<V> stateDesc,
-               HashMap<N, Map<K, V>> state) {
-               super(backend, keySerializer, namespaceSerializer, 
stateDesc.getSerializer(), stateDesc, state);
-       }
-
-       @Override
-       public V value() {
-               if (currentNSState == null) {
-                       Preconditions.checkState(currentNamespace != null, "No 
namespace set");
-                       currentNSState = state.get(currentNamespace);
-               }
-               if (currentNSState != null) {
-                       Preconditions.checkState(currentKey != null, "No key 
set");
-                       V value = currentNSState.get(currentKey);
-                       return value != null ? value : 
stateDesc.getDefaultValue();
-               }
-               return stateDesc.getDefaultValue();
-       }
-
-       @Override
-       public void update(V value) {
-               Preconditions.checkState(currentKey != null, "No key set");
-
-               if (value == null) {
-                       clear();
-                       return;
-               }
-
-               if (currentNSState == null) {
-                       Preconditions.checkState(currentNamespace != null, "No 
namespace set");
-                       currentNSState = createNewNamespaceMap();
-                       state.put(currentNamespace, currentNSState);
-               }
-
-               currentNSState.put(currentKey, value);
-       }
-
-       @Override
-       public KvStateSnapshot<K, N, ValueState<V>, ValueStateDescriptor<V>, 
FsStateBackend> createHeapSnapshot(Path filePath) {
-               return new Snapshot<>(getKeySerializer(), 
getNamespaceSerializer(), stateSerializer, stateDesc, filePath);
-       }
-
-       @Override
-       public byte[] getSerializedValue(K key, N namespace) throws Exception {
-               Preconditions.checkNotNull(key, "Key");
-               Preconditions.checkNotNull(namespace, "Namespace");
-
-               Map<K, V> stateByKey = state.get(namespace);
-               V value = stateByKey != null ? stateByKey.get(key) : 
stateDesc.getDefaultValue();
-               if (value != null) {
-                       return KvStateRequestSerializer.serializeValue(value, 
stateDesc.getSerializer());
-               } else {
-                       return 
KvStateRequestSerializer.serializeValue(stateDesc.getDefaultValue(), 
stateDesc.getSerializer());
-               }
-       }
-
-       public static class Snapshot<K, N, V> extends 
AbstractFsStateSnapshot<K, N, V, ValueState<V>, ValueStateDescriptor<V>> {
-               private static final long serialVersionUID = 1L;
-
-               public Snapshot(TypeSerializer<K> keySerializer,
-                       TypeSerializer<N> namespaceSerializer,
-                       TypeSerializer<V> stateSerializer,
-                       ValueStateDescriptor<V> stateDescs,
-                       Path filePath) {
-                       super(keySerializer, namespaceSerializer, 
stateSerializer, stateDescs, filePath);
-               }
-
-               @Override
-               public KvState<K, N, ValueState<V>, ValueStateDescriptor<V>, 
FsStateBackend> createFsState(FsStateBackend backend, HashMap<N, Map<K, V>> 
stateMap) {
-                       return new FsValueState<>(backend, keySerializer, 
namespaceSerializer, stateDesc, stateMap);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
new file mode 100644
index 0000000..9863c93
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.state.heap.StateTable;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Base class for partitioned {@link ListState} implementations that are 
backed by a regular
+ * heap hash map. The concrete implementations define how the state is 
checkpointed.
+ * 
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <SV> The type of the values in the state.
+ * @param <S> The type of State
+ * @param <SD> The type of StateDescriptor for the State S
+ */
+public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends 
StateDescriptor<S, ?>>
+               implements KvState<N>, State {
+
+       /** Map containing the actual key/value pairs */
+       protected final StateTable<K, N, SV> stateTable;
+
+       /** This holds the name of the state and can create an initial default 
value for the state. */
+       protected final SD stateDesc;
+
+       /** The current namespace, which the access methods will refer to. */
+       protected N currentNamespace = null;
+
+       protected final KeyedStateBackend<K> backend;
+
+       protected final TypeSerializer<K> keySerializer;
+
+       protected final TypeSerializer<N> namespaceSerializer;
+
+       /**
+        * Creates a new key/value state for the given hash map of key/value 
pairs.
+        *
+        * @param backend The state backend backing that created this state.
+        * @param stateDesc The state identifier for the state. This contains 
name
+        *                           and can create a default state value.
+        * @param stateTable The state tab;e to use in this kev/value state. 
May contain initial state.
+        */
+       protected AbstractHeapState(
+                       KeyedStateBackend<K> backend,
+                       SD stateDesc,
+                       StateTable<K, N, SV> stateTable,
+                       TypeSerializer<K> keySerializer,
+                       TypeSerializer<N> namespaceSerializer) {
+
+               Preconditions.checkNotNull(stateTable, "State table must not be 
null.");
+
+               this.backend = backend;
+               this.stateDesc = stateDesc;
+               this.stateTable = stateTable;
+               this.keySerializer = keySerializer;
+               this.namespaceSerializer = namespaceSerializer;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public final void clear() {
+               Preconditions.checkState(currentNamespace != null, "No 
namespace set.");
+               Preconditions.checkState(backend.getCurrentKey() != null, "No 
key set.");
+
+               Map<N, Map<K, SV>> namespaceMap =
+                               
stateTable.get(backend.getCurrentKeyGroupIndex());
+
+               if (namespaceMap == null) {
+                       return;
+               }
+
+               Map<K, SV> keyedMap = namespaceMap.get(currentNamespace);
+
+               if (keyedMap == null) {
+                       return;
+               }
+
+               SV removed = keyedMap.remove(backend.getCurrentKey());
+
+               if (removed == null) {
+                       return;
+               }
+
+               if (!keyedMap.isEmpty()) {
+                       return;
+               }
+
+               namespaceMap.remove(currentNamespace);
+       }
+
+       @Override
+       public final void setCurrentNamespace(N namespace) {
+               this.currentNamespace = Preconditions.checkNotNull(namespace, 
"Namespace must not be null.");
+       }
+
+       @Override
+       public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) 
throws Exception {
+               Preconditions.checkNotNull(serializedKeyAndNamespace, 
"Serialized key and namespace");
+
+               Tuple2<K, N> keyAndNamespace = 
KvStateRequestSerializer.deserializeKeyAndNamespace(
+                               serializedKeyAndNamespace, keySerializer, 
namespaceSerializer);
+
+               return getSerializedValue(keyAndNamespace.f0, 
keyAndNamespace.f1);
+       }
+
+       public byte[] getSerializedValue(K key, N namespace) throws Exception {
+               Preconditions.checkState(namespace != null, "No namespace 
given.");
+               Preconditions.checkState(key != null, "No key given.");
+
+               Map<N, Map<K, SV>> namespaceMap =
+                               
stateTable.get(backend.getKeyGroupAssigner().getKeyGroupIndex(key));
+
+               if (namespaceMap == null) {
+                       return null;
+               }
+
+               Map<K, SV> keyedMap = namespaceMap.get(currentNamespace);
+
+               if (keyedMap == null) {
+                       return null;
+               }
+
+               SV result = keyedMap.get(key);
+
+               if (result == null) {
+                       return null;
+               }
+
+               @SuppressWarnings("unchecked,rawtypes")
+               TypeSerializer serializer = stateDesc.getSerializer();
+
+               return KvStateRequestSerializer.serializeValue(result, 
serializer);
+       }
+
+       /**
+        * Creates a new map for use in Heap based state.
+        *
+        * <p>If the state queryable ({@link StateDescriptor#isQueryable()}, 
this
+        * will create a concurrent hash map instead of a regular one.
+        *
+        * @return A new namespace map.
+        */
+       protected <MK, MV> Map<MK, MV> createNewMap() {
+               if (stateDesc.isQueryable()) {
+                       return new ConcurrentHashMap<>();
+               } else {
+                       return new HashMap<>();
+               }
+       }
+
+       /**
+        * This should only be used for testing.
+        */
+       public StateTable<K, N, SV> getStateTable() {
+               return stateTable;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
new file mode 100644
index 0000000..1679122
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Heap-backed partitioned {@link FoldingState} that is
+ * snapshotted into files.
+ *
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <T> The type of the values that can be folded into the state.
+ * @param <ACC> The type of the value in the folding state.
+ */
+public class HeapFoldingState<K, N, T, ACC>
+               extends AbstractHeapState<K, N, ACC, FoldingState<T, ACC>, 
FoldingStateDescriptor<T, ACC>>
+               implements FoldingState<T, ACC> {
+
+       private final FoldFunction<T, ACC> foldFunction;
+
+       /**
+        * Creates a new key/value state for the given hash map of key/value 
pairs.
+        *
+        * @param backend The state backend backing that created this state.
+        * @param stateDesc The state identifier for the state. This contains 
name
+        *                           and can create a default state value.
+        * @param stateTable The state tab;e to use in this kev/value state. 
May contain initial state.
+        */
+       public HeapFoldingState(
+                       KeyedStateBackend<K> backend,
+                       FoldingStateDescriptor<T, ACC> stateDesc,
+                       StateTable<K, N, ACC> stateTable,
+                       TypeSerializer<K> keySerializer,
+                       TypeSerializer<N> namespaceSerializer) {
+               super(backend, stateDesc, stateTable, keySerializer, 
namespaceSerializer);
+               this.foldFunction = stateDesc.getFoldFunction();
+       }
+
+       @Override
+       public ACC get() {
+               Preconditions.checkState(currentNamespace != null, "No 
namespace set.");
+               Preconditions.checkState(backend.getCurrentKey() != null, "No 
key set.");
+
+               Map<N, Map<K, ACC>> namespaceMap =
+                               
stateTable.get(backend.getCurrentKeyGroupIndex());
+
+               if (namespaceMap == null) {
+                       return null;
+               }
+
+               Map<K, ACC> keyedMap = namespaceMap.get(currentNamespace);
+
+               if (keyedMap == null) {
+                       return null;
+               }
+
+               return keyedMap.get(backend.<K>getCurrentKey());
+       }
+
+       @Override
+       public void add(T value) throws IOException {
+               Preconditions.checkState(currentNamespace != null, "No 
namespace set.");
+               Preconditions.checkState(backend.getCurrentKey() != null, "No 
key set.");
+
+               if (value == null) {
+                       clear();
+                       return;
+               }
+
+               Map<N, Map<K, ACC>> namespaceMap =
+                               
stateTable.get(backend.getCurrentKeyGroupIndex());
+
+               if (namespaceMap == null) {
+                       namespaceMap = createNewMap();
+                       stateTable.set(backend.getCurrentKeyGroupIndex(), 
namespaceMap);
+               }
+
+               Map<K, ACC> keyedMap = namespaceMap.get(currentNamespace);
+
+               if (keyedMap == null) {
+                       keyedMap = createNewMap();
+                       namespaceMap.put(currentNamespace, keyedMap);
+               }
+
+               ACC currentValue = keyedMap.get(backend.<K>getCurrentKey());
+
+               try {
+
+                       if (currentValue == null) {
+                               keyedMap.put(backend.<K>getCurrentKey(),
+                                               
foldFunction.fold(stateDesc.getDefaultValue(), value));
+                       } else {
+                               keyedMap.put(backend.<K>getCurrentKey(), 
foldFunction.fold(currentValue, value));
+                       }
+               } catch (Exception e) {
+                       throw new RuntimeException("Could not add value to 
folding state.", e);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
new file mode 100644
index 0000000..fcb4bef
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.KeyGroupAssigner;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.ArrayListSerializer;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DoneFuture;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.RunnableFuture;
+
+/**
+ * A {@link KeyedStateBackend} that keeps state on the Java Heap and will 
serialize state to
+ * streams provided by a {@link 
org.apache.flink.runtime.state.CheckpointStreamFactory} upon
+ * checkpointing.
+ *
+ * @param <K> The key by which state is keyed.
+ */
+public class HeapKeyedStateBackend<K> extends KeyedStateBackend<K> {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(HeapKeyedStateBackend.class);
+
+       /**
+        * Map of state tables that stores all state of key/value states. We 
store it centrally so
+        * that we can easily checkpoint/restore it.
+        *
+        * <p>The actual parameters of StateTable are {@code 
StateTable<NamespaceT, Map<KeyT, StateT>>}
+        * but we can't put them here because different key/value states with 
different types and
+        * namespace types share this central list of tables.
+        */
+       private final Map<String, StateTable<K, ?, ?>> stateTables = new 
HashMap<>();
+
+       public HeapKeyedStateBackend(
+                       TaskKvStateRegistry kvStateRegistry,
+                       TypeSerializer<K> keySerializer,
+                       KeyGroupAssigner<K> keyGroupAssigner,
+                       KeyGroupRange keyGroupRange) {
+
+               super(kvStateRegistry, keySerializer, keyGroupAssigner, 
keyGroupRange);
+
+               LOG.info("Initializing heap keyed state backend with stream 
factory.");
+       }
+
+       public HeapKeyedStateBackend(TaskKvStateRegistry kvStateRegistry,
+                       TypeSerializer<K> keySerializer,
+                       KeyGroupAssigner<K> keyGroupAssigner,
+                       KeyGroupRange keyGroupRange,
+                       List<KeyGroupsStateHandle> restoredState) throws 
Exception {
+               super(kvStateRegistry, keySerializer, keyGroupAssigner, 
keyGroupRange);
+
+               LOG.info("Initializing heap keyed state backend from 
snapshot.");
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("Restoring snapshot from state handles: {}.", 
restoredState);
+               }
+
+               restorePartitionedState(restoredState);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  state backend operations
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public <N, V> ValueState<V> createValueState(TypeSerializer<N> 
namespaceSerializer, ValueStateDescriptor<V> stateDesc) throws Exception {
+               @SuppressWarnings("unchecked,rawtypes")
+               StateTable<K, N, V> stateTable = (StateTable) 
stateTables.get(stateDesc.getName());
+
+
+               if (stateTable == null) {
+                       stateTable = new 
StateTable<>(stateDesc.getSerializer(), namespaceSerializer, keyGroupRange);
+                       stateTables.put(stateDesc.getName(), stateTable);
+               }
+
+               return new HeapValueState<>(this, stateDesc, stateTable, 
keySerializer, namespaceSerializer);
+       }
+
+       @Override
+       public <N, T> ListState<T> createListState(TypeSerializer<N> 
namespaceSerializer, ListStateDescriptor<T> stateDesc) throws Exception {
+               @SuppressWarnings("unchecked,rawtypes")
+               StateTable<K, N, ArrayList<T>> stateTable = (StateTable) 
stateTables.get(stateDesc.getName());
+
+               if (stateTable == null) {
+                       stateTable = new StateTable<>(new 
ArrayListSerializer<>(stateDesc.getSerializer()), namespaceSerializer, 
keyGroupRange);
+                       stateTables.put(stateDesc.getName(), stateTable);
+               }
+
+               return new HeapListState<>(this, stateDesc, stateTable, 
keySerializer, namespaceSerializer);
+       }
+
+       @Override
+       public <N, T> ReducingState<T> createReducingState(TypeSerializer<N> 
namespaceSerializer, ReducingStateDescriptor<T> stateDesc) throws Exception {
+               @SuppressWarnings("unchecked,rawtypes")
+               StateTable<K, N, T> stateTable = (StateTable) 
stateTables.get(stateDesc.getName());
+
+
+               if (stateTable == null) {
+                       stateTable = new 
StateTable<>(stateDesc.getSerializer(), namespaceSerializer, keyGroupRange);
+                       stateTables.put(stateDesc.getName(), stateTable);
+               }
+
+               return new HeapReducingState<>(this, stateDesc, stateTable, 
keySerializer, namespaceSerializer);
+       }
+
+       @Override
+       protected <N, T, ACC> FoldingState<T, ACC> 
createFoldingState(TypeSerializer<N> namespaceSerializer, 
FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
+               @SuppressWarnings("unchecked,rawtypes")
+               StateTable<K, N, ACC> stateTable = (StateTable) 
stateTables.get(stateDesc.getName());
+
+               if (stateTable == null) {
+                       stateTable = new 
StateTable<>(stateDesc.getSerializer(), namespaceSerializer, keyGroupRange);
+                       stateTables.put(stateDesc.getName(), stateTable);
+               }
+
+               return new HeapFoldingState<>(this, stateDesc, stateTable, 
keySerializer, namespaceSerializer);
+       }
+
+       @Override
+       @SuppressWarnings("rawtypes,unchecked")
+       public RunnableFuture<KeyGroupsStateHandle> snapshot(
+                       long checkpointId,
+                       long timestamp,
+                       CheckpointStreamFactory streamFactory) throws Exception 
{
+
+               CheckpointStreamFactory.CheckpointStateOutputStream stream =
+                               streamFactory.createCheckpointStateOutputStream(
+                                               checkpointId,
+                                               timestamp);
+
+               if (stateTables.isEmpty()) {
+                       return new DoneFuture<>(null);
+               }
+
+               DataOutputViewStreamWrapper outView = new 
DataOutputViewStreamWrapper(stream);
+
+               Preconditions.checkState(stateTables.size() <= Short.MAX_VALUE,
+                               "Too many KV-States: " + stateTables.size() +
+                                               ". Currently at most " + 
Short.MAX_VALUE + " states are supported");
+
+               outView.writeShort(stateTables.size());
+
+               Map<String, Integer> kVStateToId = new 
HashMap<>(stateTables.size());
+
+               for (Map.Entry<String, StateTable<K, ?, ?>> kvState : 
stateTables.entrySet()) {
+
+                       outView.writeUTF(kvState.getKey());
+
+                       TypeSerializer namespaceSerializer = 
kvState.getValue().getNamespaceSerializer();
+                       TypeSerializer stateSerializer = 
kvState.getValue().getStateSerializer();
+
+                       ObjectOutputStream oos = new 
ObjectOutputStream(outView);
+                       oos.writeObject(namespaceSerializer);
+                       oos.writeObject(stateSerializer);
+                       oos.flush();
+
+                       kVStateToId.put(kvState.getKey(), kVStateToId.size());
+               }
+
+               int offsetCounter = 0;
+               long[] keyGroupRangeOffsets = new 
long[keyGroupRange.getNumberOfKeyGroups()];
+
+               for (int keyGroupIndex = keyGroupRange.getStartKeyGroup(); 
keyGroupIndex <= keyGroupRange.getEndKeyGroup(); keyGroupIndex++) {
+                       keyGroupRangeOffsets[offsetCounter++] = stream.getPos();
+                       outView.writeInt(keyGroupIndex);
+
+                       for (Map.Entry<String, StateTable<K, ?, ?>> kvState : 
stateTables.entrySet()) {
+
+                               
outView.writeShort(kVStateToId.get(kvState.getKey()));
+
+                               TypeSerializer namespaceSerializer = 
kvState.getValue().getNamespaceSerializer();
+                               TypeSerializer stateSerializer = 
kvState.getValue().getStateSerializer();
+
+                               // Map<NamespaceT, Map<KeyT, StateT>>
+                               Map<?, ? extends Map<K, ?>> namespaceMap = 
kvState.getValue().get(keyGroupIndex);
+                               if (namespaceMap == null) {
+                                       outView.writeByte(0);
+                                       continue;
+                               }
+
+                               outView.writeByte(1);
+
+                               // number of namespaces
+                               outView.writeInt(namespaceMap.size());
+                               for (Map.Entry<?, ? extends Map<K, ?>> 
namespace : namespaceMap.entrySet()) {
+                                       
namespaceSerializer.serialize(namespace.getKey(), outView);
+
+                                       Map<K, ?> entryMap = 
namespace.getValue();
+
+                                       // number of entries
+                                       outView.writeInt(entryMap.size());
+                                       for (Map.Entry<K, ?> entry : 
entryMap.entrySet()) {
+                                               
keySerializer.serialize(entry.getKey(), outView);
+                                               
stateSerializer.serialize(entry.getValue(), outView);
+                                       }
+                               }
+                       }
+                       outView.flush();
+               }
+
+               StreamStateHandle streamStateHandle = 
stream.closeAndGetHandle();
+
+               KeyGroupRangeOffsets offsets = new 
KeyGroupRangeOffsets(keyGroupRange, keyGroupRangeOffsets);
+               final KeyGroupsStateHandle keyGroupsStateHandle = new 
KeyGroupsStateHandle(offsets, streamStateHandle);
+
+               return new DoneFuture(keyGroupsStateHandle);
+       }
+
+       @SuppressWarnings({"unchecked", "rawtypes"})
+       public void restorePartitionedState(List<KeyGroupsStateHandle> state) 
throws Exception {
+
+               for (KeyGroupsStateHandle keyGroupsHandle : state) {
+
+                       if(keyGroupsHandle == null) {
+                               continue;
+                       }
+
+                       FSDataInputStream fsDataInputStream = 
keyGroupsHandle.getStateHandle().openInputStream();
+                       DataInputViewStreamWrapper inView = new 
DataInputViewStreamWrapper(fsDataInputStream);
+
+                       int numKvStates = inView.readShort();
+
+                       Map<Integer, String> kvStatesById = new 
HashMap<>(numKvStates);
+
+                       for (int i = 0; i < numKvStates; ++i) {
+                               String stateName = inView.readUTF();
+
+                               ObjectInputStream ois = new 
ObjectInputStream(inView);
+
+                               TypeSerializer namespaceSerializer = 
(TypeSerializer) ois.readObject();
+                               TypeSerializer stateSerializer = 
(TypeSerializer) ois.readObject();
+                               StateTable<K, ?, ?> stateTable = new 
StateTable(stateSerializer,
+                                               namespaceSerializer,
+                                               keyGroupRange);
+                               stateTables.put(stateName, stateTable);
+                               kvStatesById.put(i, stateName);
+                       }
+
+                       for (int keyGroupIndex = 
keyGroupRange.getStartKeyGroup(); keyGroupIndex <= 
keyGroupRange.getEndKeyGroup(); keyGroupIndex++) {
+                               long offset = 
keyGroupsHandle.getOffsetForKeyGroup(keyGroupIndex);
+                               fsDataInputStream.seek(offset);
+
+                               int writtenKeyGroupIndex = inView.readInt();
+                               assert writtenKeyGroupIndex == keyGroupIndex;
+
+                               for (int i = 0; i < numKvStates; i++) {
+                                       int kvStateId = inView.readShort();
+
+                                       byte isPresent = inView.readByte();
+                                       if (isPresent == 0) {
+                                               continue;
+                                       }
+
+                                       StateTable<K, ?, ?> stateTable = 
stateTables.get(kvStatesById.get(kvStateId));
+                                       Preconditions.checkNotNull(stateTable);
+
+                                       TypeSerializer namespaceSerializer = 
stateTable.getNamespaceSerializer();
+                                       TypeSerializer stateSerializer = 
stateTable.getStateSerializer();
+
+                                       Map namespaceMap = new HashMap<>();
+                                       stateTable.set(keyGroupIndex, 
namespaceMap);
+
+                                       int numNamespaces = inView.readInt();
+                                       for (int k = 0; k < numNamespaces; k++) 
{
+                                               Object namespace = 
namespaceSerializer.deserialize(inView);
+                                               Map entryMap = new HashMap<>();
+                                               namespaceMap.put(namespace, 
entryMap);
+
+                                               int numEntries = 
inView.readInt();
+                                               for (int l = 0; l < numEntries; 
l++) {
+                                                       Object key = 
keySerializer.deserialize(inView);
+                                                       Object value = 
stateSerializer.deserialize(inView);
+                                                       entryMap.put(key, 
value);
+                                               }
+                                       }
+                               }
+                       }
+               }
+       }
+
+       @Override
+       public String toString() {
+               return "HeapKeyedStateBackend";
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
new file mode 100644
index 0000000..4c65c25
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.util.Preconditions;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.Map;
+
+/**
+ * Heap-backed partitioned {@link org.apache.flink.api.common.state.ListState} 
that is snapshotted
+ * into files.
+ * 
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <V> The type of the value.
+ */
+public class HeapListState<K, N, V>
+               extends AbstractHeapState<K, N, ArrayList<V>, ListState<V>, 
ListStateDescriptor<V>>
+               implements ListState<V> {
+
+       /**
+        * Creates a new key/value state for the given hash map of key/value 
pairs.
+        *
+        * @param backend The state backend backing that created this state.
+        * @param stateDesc The state identifier for the state. This contains 
name
+        *                           and can create a default state value.
+        * @param stateTable The state tab;e to use in this kev/value state. 
May contain initial state.
+        */
+       public HeapListState(
+                       KeyedStateBackend<K> backend,
+                       ListStateDescriptor<V> stateDesc,
+                       StateTable<K, N, ArrayList<V>> stateTable,
+                       TypeSerializer<K> keySerializer,
+                       TypeSerializer<N> namespaceSerializer) {
+               super(backend, stateDesc, stateTable, keySerializer, 
namespaceSerializer);
+       }
+
+       @Override
+       public Iterable<V> get() {
+               Preconditions.checkState(currentNamespace != null, "No 
namespace set.");
+               Preconditions.checkState(backend.getCurrentKey() != null, "No 
key set.");
+
+               Map<N, Map<K, ArrayList<V>>> namespaceMap =
+                               
stateTable.get(backend.getCurrentKeyGroupIndex());
+
+               if (namespaceMap == null) {
+                       return null;
+               }
+
+               Map<K, ArrayList<V>> keyedMap = 
namespaceMap.get(currentNamespace);
+
+               if (keyedMap == null) {
+                       return null;
+               }
+
+               return keyedMap.get(backend.<K>getCurrentKey());
+       }
+
+       @Override
+       public void add(V value) {
+               Preconditions.checkState(currentNamespace != null, "No 
namespace set.");
+               Preconditions.checkState(backend.getCurrentKey() != null, "No 
key set.");
+
+               if (value == null) {
+                       clear();
+                       return;
+               }
+
+               Map<N, Map<K, ArrayList<V>>> namespaceMap =
+                               
stateTable.get(backend.getCurrentKeyGroupIndex());
+
+               if (namespaceMap == null) {
+                       namespaceMap = createNewMap();
+                       stateTable.set(backend.getCurrentKeyGroupIndex(), 
namespaceMap);
+               }
+
+               Map<K, ArrayList<V>> keyedMap = 
namespaceMap.get(currentNamespace);
+
+               if (keyedMap == null) {
+                       keyedMap = createNewMap();
+                       namespaceMap.put(currentNamespace, keyedMap);
+               }
+
+               ArrayList<V> list = keyedMap.get(backend.<K>getCurrentKey());
+
+               if (list == null) {
+                       list = new ArrayList<>();
+                       keyedMap.put(backend.<K>getCurrentKey(), list);
+               }
+               list.add(value);
+       }
+       
+       @Override
+       public byte[] getSerializedValue(K key, N namespace) throws Exception {
+               Preconditions.checkState(namespace != null, "No namespace 
given.");
+               Preconditions.checkState(key != null, "No key given.");
+
+               Map<N, Map<K, ArrayList<V>>> namespaceMap =
+                               
stateTable.get(backend.getKeyGroupAssigner().getKeyGroupIndex(key));
+
+               if (namespaceMap == null) {
+                       return null;
+               }
+
+               Map<K, ArrayList<V>> keyedMap = 
namespaceMap.get(currentNamespace);
+
+               if (keyedMap == null) {
+                       return null;
+               }
+
+               ArrayList<V> result = keyedMap.get(key);
+
+               if (result == null) {
+                       return null;
+               }
+
+               TypeSerializer<V> serializer = stateDesc.getSerializer();
+
+               ByteArrayOutputStream baos = new ByteArrayOutputStream();
+               DataOutputViewStreamWrapper view = new 
DataOutputViewStreamWrapper(baos);
+
+               // write the same as RocksDB writes lists, with one ',' 
separator
+               for (int i = 0; i < result.size(); i++) {
+                       serializer.serialize(result.get(i), view);
+                       if (i < result.size() -1) {
+                               view.writeByte(',');
+                       }
+               }
+               view.flush();
+
+               return baos.toByteArray();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
new file mode 100644
index 0000000..37aa812
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Heap-backed partitioned {@link 
org.apache.flink.api.common.state.ReducingState} that is
+ * snapshotted into files.
+ * 
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <V> The type of the value.
+ */
+public class HeapReducingState<K, N, V>
+               extends AbstractHeapState<K, N, V, ReducingState<V>, 
ReducingStateDescriptor<V>>
+               implements ReducingState<V> {
+
+       private final ReduceFunction<V> reduceFunction;
+
+       /**
+        * Creates a new key/value state for the given hash map of key/value 
pairs.
+        *
+        * @param backend The state backend backing that created this state.
+        * @param stateDesc The state identifier for the state. This contains 
name
+        *                           and can create a default state value.
+        * @param stateTable The state tab;e to use in this kev/value state. 
May contain initial state.
+        */
+       public HeapReducingState(
+                       KeyedStateBackend<K> backend,
+                       ReducingStateDescriptor<V> stateDesc,
+                       StateTable<K, N, V> stateTable,
+                       TypeSerializer<K> keySerializer,
+                       TypeSerializer<N> namespaceSerializer) {
+               super(backend, stateDesc, stateTable, keySerializer, 
namespaceSerializer);
+               this.reduceFunction = stateDesc.getReduceFunction();
+       }
+
+       @Override
+       public V get() {
+               Preconditions.checkState(currentNamespace != null, "No 
namespace set.");
+               Preconditions.checkState(backend.getCurrentKey() != null, "No 
key set.");
+
+               Map<N, Map<K, V>> namespaceMap =
+                               
stateTable.get(backend.getCurrentKeyGroupIndex());
+
+               if (namespaceMap == null) {
+                       return null;
+               }
+
+               Map<K, V> keyedMap = namespaceMap.get(currentNamespace);
+
+               if (keyedMap == null) {
+                       return null;
+               }
+
+               return keyedMap.get(backend.<K>getCurrentKey());
+       }
+
+       @Override
+       public void add(V value) throws IOException {
+               Preconditions.checkState(currentNamespace != null, "No 
namespace set.");
+               Preconditions.checkState(backend.getCurrentKey() != null, "No 
key set.");
+
+               if (value == null) {
+                       clear();
+                       return;
+               }
+
+               Map<N, Map<K, V>> namespaceMap =
+                               
stateTable.get(backend.getCurrentKeyGroupIndex());
+
+               if (namespaceMap == null) {
+                       namespaceMap = createNewMap();
+                       stateTable.set(backend.getCurrentKeyGroupIndex(), 
namespaceMap);
+               }
+
+               Map<K, V> keyedMap = namespaceMap.get(currentNamespace);
+
+               if (keyedMap == null) {
+                       keyedMap = createNewMap();
+                       namespaceMap.put(currentNamespace, keyedMap);
+               }
+
+               V currentValue = keyedMap.put(backend.<K>getCurrentKey(), 
value);
+
+               if (currentValue == null) {
+                       // we're good, just added the new value
+               } else {
+                       V reducedValue = null;
+                       try {
+                               reducedValue = 
reduceFunction.reduce(currentValue, value);
+                       } catch (Exception e) {
+                               throw new RuntimeException("Could not add value 
to reducing state.", e);
+                       }
+                       keyedMap.put(backend.<K>getCurrentKey(), reducedValue);
+               }
+       }
+}

Reply via email to