http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandler.java
 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandler.java
new file mode 100644
index 0000000..fc9b1d4
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandler.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.queryablestate.network.messages.MessageBody;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.queryablestate.network.messages.MessageType;
+import org.apache.flink.queryablestate.network.messages.RequestFailure;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.channels.ClosedChannelException;
+
+/**
+ * The handler used by a {@link Client} to handling incoming messages.
+ *
+ * @param <REQ> the type of request the client will send.
+ * @param <RESP> the type of response the client expects to receive.
+ */
+@Internal
+public class ClientHandler<REQ extends MessageBody, RESP extends MessageBody> 
extends ChannelInboundHandlerAdapter {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ClientHandler.class);
+
+       private final String clientName;
+
+       private final MessageSerializer<REQ, RESP> serializer;
+
+       private final ClientHandlerCallback<RESP> callback;
+
+       /**
+        * Creates a handler with the callback.
+        *
+        * @param clientName the name of the client.
+        * @param serializer the serializer used to (de-)serialize messages.
+        * @param callback Callback for responses.
+        */
+       public ClientHandler(
+                       final String clientName,
+                       final MessageSerializer<REQ, RESP> serializer,
+                       final ClientHandlerCallback<RESP> callback) {
+
+               this.clientName = Preconditions.checkNotNull(clientName);
+               this.serializer = Preconditions.checkNotNull(serializer);
+               this.callback = Preconditions.checkNotNull(callback);
+       }
+
+       @Override
+       public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
+               try {
+                       ByteBuf buf = (ByteBuf) msg;
+                       MessageType msgType = 
MessageSerializer.deserializeHeader(buf);
+
+                       if (msgType == MessageType.REQUEST_RESULT) {
+                               long requestId = 
MessageSerializer.getRequestId(buf);
+                               RESP result = 
serializer.deserializeResponse(buf);
+                               callback.onRequestResult(requestId, result);
+                       } else if (msgType == MessageType.REQUEST_FAILURE) {
+                               RequestFailure failure = 
MessageSerializer.deserializeRequestFailure(buf);
+                               
callback.onRequestFailure(failure.getRequestId(), failure.getCause());
+                       } else if (msgType == MessageType.SERVER_FAILURE) {
+                               throw 
MessageSerializer.deserializeServerFailure(buf);
+                       } else {
+                               throw new IllegalStateException("Unexpected 
response type '" + msgType + "'");
+                       }
+               } catch (Throwable t1) {
+                       try {
+                               callback.onFailure(t1);
+                       } catch (Throwable t2) {
+                               LOG.error("Failed to notify callback about 
failure", t2);
+                       }
+               } finally {
+                       ReferenceCountUtil.release(msg);
+               }
+       }
+
+       @Override
+       public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
throws Exception {
+               try {
+                       callback.onFailure(cause);
+               } catch (Throwable t) {
+                       LOG.error("Failed to notify callback about failure", t);
+               }
+       }
+
+       @Override
+       public void channelInactive(ChannelHandlerContext ctx) throws Exception 
{
+               // Only the client is expected to close the channel. Otherwise 
it
+               // indicates a failure. Note that this will be invoked in both 
cases
+               // though. If the callback closed the channel, the callback 
must be
+               // ignored.
+               try {
+                       callback.onFailure(new ClosedChannelException());
+               } catch (Throwable t) {
+                       LOG.error("Failed to notify callback about failure", t);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandlerCallback.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandlerCallback.java
 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandlerCallback.java
new file mode 100644
index 0000000..00ce1ed
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandlerCallback.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.queryablestate.network.messages.MessageBody;
+
+/**
+ * Callback for {@link ClientHandler}.
+ */
+@Internal
+public interface ClientHandlerCallback<RESP extends MessageBody> {
+
+       /**
+        * Called on a successful request.
+        *
+        * @param requestId                     ID of the request
+        * @param response                      The received response
+        */
+       void onRequestResult(long requestId, RESP response);
+
+       /**
+        * Called on a failed request.
+        *
+        * @param requestId ID of the request
+        * @param cause     Cause of the request failure
+        */
+       void onRequestFailure(long requestId, Throwable cause);
+
+       /**
+        * Called on any failure, which is not related to a specific request.
+        *
+        * <p>This can be for example a caught Exception in the channel pipeline
+        * or an unexpected channel close.
+        *
+        * @param cause Cause of the failure
+        */
+       void onFailure(Throwable cause);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/NettyBufferPool.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/NettyBufferPool.java
 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/NettyBufferPool.java
new file mode 100644
index 0000000..5e014b8
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/NettyBufferPool.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
+import org.apache.flink.shaded.netty4.io.netty.buffer.CompositeByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Wrapper around Netty's {@link PooledByteBufAllocator} with strict control
+ * over the number of created arenas.
+ */
+public class NettyBufferPool implements ByteBufAllocator {
+
+       /** The wrapped buffer allocator. */
+       private final PooledByteBufAllocator alloc;
+
+       /**
+        * Creates Netty's buffer pool with the specified number of direct 
arenas.
+        *
+        * @param numberOfArenas Number of arenas (recommended: 2 * number of 
task
+        *                       slots)
+        */
+       public NettyBufferPool(int numberOfArenas) {
+               checkArgument(numberOfArenas >= 1, "Number of arenas");
+
+               // We strictly prefer direct buffers and disallow heap 
allocations.
+               boolean preferDirect = true;
+
+               // Arenas allocate chunks of pageSize << maxOrder bytes. With 
these
+               // defaults, this results in chunks of 16 MB.
+               int pageSize = 8192;
+               int maxOrder = 11;
+
+               // Number of direct arenas. Each arena allocates a chunk of 16 
MB, i.e.
+               // we allocate numDirectArenas * 16 MB of direct memory. This 
can grow
+               // to multiple chunks per arena during runtime, but this should 
only
+               // happen with a large amount of connections per task manager. 
We
+               // control the memory allocations with low/high watermarks when 
writing
+               // to the TCP channels. Chunks are allocated lazily.
+               int numDirectArenas = numberOfArenas;
+
+               // No heap arenas, please.
+               int numHeapArenas = 0;
+
+               this.alloc = new PooledByteBufAllocator(
+                               preferDirect,
+                               numHeapArenas,
+                               numDirectArenas,
+                               pageSize,
+                               maxOrder);
+       }
+
+       // 
------------------------------------------------------------------------
+       // Delegate calls to the allocated and prohibit heap buffer allocations
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public ByteBuf buffer() {
+               return alloc.buffer();
+       }
+
+       @Override
+       public ByteBuf buffer(int initialCapacity) {
+               return alloc.buffer(initialCapacity);
+       }
+
+       @Override
+       public ByteBuf buffer(int initialCapacity, int maxCapacity) {
+               return alloc.buffer(initialCapacity, maxCapacity);
+       }
+
+       @Override
+       public ByteBuf ioBuffer() {
+               return alloc.ioBuffer();
+       }
+
+       @Override
+       public ByteBuf ioBuffer(int initialCapacity) {
+               return alloc.ioBuffer(initialCapacity);
+       }
+
+       @Override
+       public ByteBuf ioBuffer(int initialCapacity, int maxCapacity) {
+               return alloc.ioBuffer(initialCapacity, maxCapacity);
+       }
+
+       @Override
+       public ByteBuf heapBuffer() {
+               throw new UnsupportedOperationException("Heap buffer");
+       }
+
+       @Override
+       public ByteBuf heapBuffer(int initialCapacity) {
+               throw new UnsupportedOperationException("Heap buffer");
+       }
+
+       @Override
+       public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
+               throw new UnsupportedOperationException("Heap buffer");
+       }
+
+       @Override
+       public ByteBuf directBuffer() {
+               return alloc.directBuffer();
+       }
+
+       @Override
+       public ByteBuf directBuffer(int initialCapacity) {
+               return alloc.directBuffer(initialCapacity);
+       }
+
+       @Override
+       public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
+               return alloc.directBuffer(initialCapacity, maxCapacity);
+       }
+
+       @Override
+       public CompositeByteBuf compositeBuffer() {
+               return alloc.compositeBuffer();
+       }
+
+       @Override
+       public CompositeByteBuf compositeBuffer(int maxNumComponents) {
+               return alloc.compositeBuffer(maxNumComponents);
+       }
+
+       @Override
+       public CompositeByteBuf compositeHeapBuffer() {
+               throw new UnsupportedOperationException("Heap buffer");
+       }
+
+       @Override
+       public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) {
+               throw new UnsupportedOperationException("Heap buffer");
+       }
+
+       @Override
+       public CompositeByteBuf compositeDirectBuffer() {
+               return alloc.compositeDirectBuffer();
+       }
+
+       @Override
+       public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) {
+               return alloc.compositeDirectBuffer(maxNumComponents);
+       }
+
+       @Override
+       public boolean isDirectBufferPooled() {
+               return alloc.isDirectBufferPooled();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageBody.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageBody.java
 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageBody.java
new file mode 100644
index 0000000..f26c267
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageBody.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network.messages;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * The base class for every message exchanged during the communication between
+ * {@link org.apache.flink.queryablestate.network.Client client} and
+ * {@link org.apache.flink.queryablestate.network.AbstractServerBase server}.
+ *
+ * <p>Every such message should also have a {@link MessageDeserializer}.
+ */
+@Internal
+public abstract class MessageBody {
+
+       /**
+        * Serializes the message into a byte array.
+        * @return A byte array with the serialized content of the message.
+        */
+       public abstract byte[] serialize();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageDeserializer.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageDeserializer.java
 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageDeserializer.java
new file mode 100644
index 0000000..436fb82
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageDeserializer.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network.messages;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+/**
+ * A utility used to deserialize a {@link MessageBody message}.
+ * @param <M> The type of the message to be deserialized.
+ *           It has to extend {@link MessageBody}
+ */
+@Internal
+public interface MessageDeserializer<M extends MessageBody> {
+
+       /**
+        * Deserializes a message contained in a byte buffer.
+        * @param buf the buffer containing the message.
+        * @return The deserialized message.
+        */
+       M deserializeMessage(ByteBuf buf);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageSerializer.java
 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageSerializer.java
new file mode 100644
index 0000000..c0a0d32
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageSerializer.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network.messages;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufOutputStream;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+
+/**
+ * Serialization and deserialization of messages exchanged between
+ * {@link org.apache.flink.queryablestate.network.Client client} and
+ * {@link org.apache.flink.queryablestate.network.AbstractServerBase server}.
+ *
+ * <p>The binary messages have the following format:
+ *
+ * <pre>
+ *                     <------ Frame ------------------------->
+ *                    +----------------------------------------+
+ *                    |        HEADER (8)      | PAYLOAD (VAR) |
+ * +------------------+----------------------------------------+
+ * | FRAME LENGTH (4) | VERSION (4) | TYPE (4) | CONTENT (VAR) |
+ * +------------------+----------------------------------------+
+ * </pre>
+ *
+ * <p>The concrete content of a message depends on the {@link MessageType}.
+ *
+ * @param <REQ>                Type of the requests of the protocol.
+ * @param <RESP>       Type of the responses of the protocol.
+ */
+@Internal
+public final class MessageSerializer<REQ extends MessageBody, RESP extends 
MessageBody> {
+
+       /** The serialization version ID. */
+       private static final int VERSION = 0x79a1b710;
+
+       /** Byte length of the header. */
+       private static final int HEADER_LENGTH = 2 * Integer.BYTES;
+
+       /** Byte length of the request id. */
+       private static final int REQUEST_ID_SIZE = Long.BYTES;
+
+       /** The constructor of the {@link MessageBody client requests}. Used 
for deserialization. */
+       private final MessageDeserializer<REQ> requestDeserializer;
+
+       /** The constructor of the {@link MessageBody server responses}. Used 
for deserialization. */
+       private final MessageDeserializer<RESP> responseDeserializer;
+
+       public MessageSerializer(MessageDeserializer<REQ> requestDeser, 
MessageDeserializer<RESP> responseDeser) {
+               requestDeserializer = Preconditions.checkNotNull(requestDeser);
+               responseDeserializer = 
Preconditions.checkNotNull(responseDeser);
+       }
+
+       // 
------------------------------------------------------------------------
+       // Serialization
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Serializes the request sent to the
+        * {@link org.apache.flink.queryablestate.network.AbstractServerBase}.
+        *
+        * @param alloc                 The {@link ByteBufAllocator} used to 
allocate the buffer to serialize the message into.
+        * @param requestId             The id of the request to which the 
message refers to.
+        * @param request               The request to be serialized.
+        * @return A {@link ByteBuf} containing the serialized message.
+        */
+       public static <REQ extends MessageBody> ByteBuf serializeRequest(
+                       final ByteBufAllocator alloc,
+                       final long requestId,
+                       final REQ request) {
+               Preconditions.checkNotNull(request);
+               return writePayload(alloc, requestId, MessageType.REQUEST, 
request.serialize());
+       }
+
+       /**
+        * Serializes the response sent to the
+        * {@link org.apache.flink.queryablestate.network.Client}.
+        *
+        * @param alloc                 The {@link ByteBufAllocator} used to 
allocate the buffer to serialize the message into.
+        * @param requestId             The id of the request to which the 
message refers to.
+        * @param response              The response to be serialized.
+        * @return A {@link ByteBuf} containing the serialized message.
+        */
+       public static <RESP extends MessageBody> ByteBuf serializeResponse(
+                       final ByteBufAllocator alloc,
+                       final long requestId,
+                       final RESP response) {
+               Preconditions.checkNotNull(response);
+               return writePayload(alloc, requestId, 
MessageType.REQUEST_RESULT, response.serialize());
+       }
+
+       /**
+        * Serializes the exception containing the failure message sent to the
+        * {@link org.apache.flink.queryablestate.network.Client} in case of
+        * protocol related errors.
+        *
+        * @param alloc                 The {@link ByteBufAllocator} used to 
allocate the buffer to serialize the message into.
+        * @param requestId             The id of the request to which the 
message refers to.
+        * @param cause                 The exception thrown at the server.
+        * @return A {@link ByteBuf} containing the serialized message.
+        */
+       public static ByteBuf serializeRequestFailure(
+                       final ByteBufAllocator alloc,
+                       final long requestId,
+                       final Throwable cause) throws IOException {
+
+               final ByteBuf buf = alloc.ioBuffer();
+
+               // Frame length is set at the end
+               buf.writeInt(0);
+               writeHeader(buf, MessageType.REQUEST_FAILURE);
+               buf.writeLong(requestId);
+
+               try (ByteBufOutputStream bbos = new ByteBufOutputStream(buf);
+                               ObjectOutput out = new 
ObjectOutputStream(bbos)) {
+                       out.writeObject(cause);
+               }
+
+               // Set frame length
+               int frameLength = buf.readableBytes() - Integer.BYTES;
+               buf.setInt(0, frameLength);
+               return buf;
+       }
+
+       /**
+        * Serializes the failure message sent to the
+        * {@link org.apache.flink.queryablestate.network.Client} in case of
+        * server related errors.
+        *
+        * @param alloc                 The {@link ByteBufAllocator} used to 
allocate the buffer to serialize the message into.
+        * @param cause                 The exception thrown at the server.
+        * @return              The failure message.
+        */
+       public static ByteBuf serializeServerFailure(
+                       final ByteBufAllocator alloc,
+                       final Throwable cause) throws IOException {
+
+               final ByteBuf buf = alloc.ioBuffer();
+
+               // Frame length is set at end
+               buf.writeInt(0);
+               writeHeader(buf, MessageType.SERVER_FAILURE);
+
+               try (ByteBufOutputStream bbos = new ByteBufOutputStream(buf);
+                               ObjectOutput out = new 
ObjectOutputStream(bbos)) {
+                       out.writeObject(cause);
+               }
+
+               // Set frame length
+               int frameLength = buf.readableBytes() - Integer.BYTES;
+               buf.setInt(0, frameLength);
+               return buf;
+       }
+
+       /**
+        * Helper for serializing the header.
+        *
+        * @param buf         The {@link ByteBuf} to serialize the header into.
+        * @param messageType The {@link MessageType} of the message this 
header refers to.
+        */
+       private static void writeHeader(final ByteBuf buf, final MessageType 
messageType) {
+               buf.writeInt(VERSION);
+               buf.writeInt(messageType.ordinal());
+       }
+
+       /**
+        * Helper for serializing the messages.
+        *
+        * @param alloc                 The {@link ByteBufAllocator} used to 
allocate the buffer to serialize the message into.
+        * @param requestId             The id of the request to which the 
message refers to.
+        * @param messageType   The {@link MessageType type of the message}.
+        * @param payload               The serialized version of the message.
+        * @return A {@link ByteBuf} containing the serialized message.
+        */
+       private static ByteBuf writePayload(
+                       final ByteBufAllocator alloc,
+                       final long requestId,
+                       final MessageType messageType,
+                       final byte[] payload) {
+
+               final int frameLength = HEADER_LENGTH + REQUEST_ID_SIZE + 
payload.length;
+               final ByteBuf buf = alloc.ioBuffer(frameLength + Integer.BYTES);
+
+               buf.writeInt(frameLength);
+               writeHeader(buf, messageType);
+               buf.writeLong(requestId);
+               buf.writeBytes(payload);
+               return buf;
+       }
+
+       // 
------------------------------------------------------------------------
+       // Deserialization
+       // 
------------------------------------------------------------------------
+
+       /**
+        * De-serializes the header and returns the {@link MessageType}.
+        * <pre>
+        *  <b>The buffer is expected to be at the header position.</b>
+        * </pre>
+        * @param buf                                           The {@link 
ByteBuf} containing the serialized header.
+        * @return                                                      The 
message type.
+        * @throws IllegalStateException        If unexpected message version 
or message type.
+        */
+       public static MessageType deserializeHeader(final ByteBuf buf) {
+
+               // checking the version
+               int version = buf.readInt();
+               Preconditions.checkState(version == VERSION,
+                               "Version Mismatch:  Found " + version + ", 
Expected: " + VERSION + '.');
+
+               // fetching the message type
+               int msgType = buf.readInt();
+               MessageType[] values = MessageType.values();
+               Preconditions.checkState(msgType >= 0 && msgType < 
values.length,
+                               "Illegal message type with index " + msgType + 
'.');
+               return values[msgType];
+       }
+
+       /**
+        * De-serializes the header and returns the {@link MessageType}.
+        * <pre>
+        *  <b>The buffer is expected to be at the request id position.</b>
+        * </pre>
+        * @param buf   The {@link ByteBuf} containing the serialized request 
id.
+        * @return              The request id.
+        */
+       public static long getRequestId(final ByteBuf buf) {
+               return buf.readLong();
+       }
+
+       /**
+        * De-serializes the request sent to the
+        * {@link org.apache.flink.queryablestate.network.AbstractServerBase}.
+        * <pre>
+        *  <b>The buffer is expected to be at the request position.</b>
+        * </pre>
+        * @param buf   The {@link ByteBuf} containing the serialized request.
+        * @return              The request.
+        */
+       public REQ deserializeRequest(final ByteBuf buf) {
+               Preconditions.checkNotNull(buf);
+               return requestDeserializer.deserializeMessage(buf);
+       }
+
+       /**
+        * De-serializes the response sent to the
+        * {@link org.apache.flink.queryablestate.network.Client}.
+        * <pre>
+        *  <b>The buffer is expected to be at the response position.</b>
+        * </pre>
+        * @param buf   The {@link ByteBuf} containing the serialized response.
+        * @return              The response.
+        */
+       public RESP deserializeResponse(final ByteBuf buf) {
+               Preconditions.checkNotNull(buf);
+               return responseDeserializer.deserializeMessage(buf);
+       }
+
+       /**
+        * De-serializes the {@link RequestFailure} sent to the
+        * {@link org.apache.flink.queryablestate.network.Client} in case of
+        * protocol related errors.
+        * <pre>
+        *  <b>The buffer is expected to be at the correct position.</b>
+        * </pre>
+        * @param buf   The {@link ByteBuf} containing the serialized failure 
message.
+        * @return              The failure message.
+        */
+       public static RequestFailure deserializeRequestFailure(final ByteBuf 
buf) throws IOException, ClassNotFoundException {
+               long requestId = buf.readLong();
+
+               Throwable cause;
+               try (ByteBufInputStream bis = new ByteBufInputStream(buf);
+                               ObjectInputStream in = new 
ObjectInputStream(bis)) {
+                       cause = (Throwable) in.readObject();
+               }
+               return new RequestFailure(requestId, cause);
+       }
+
+       /**
+        * De-serializes the failure message sent to the
+        * {@link org.apache.flink.queryablestate.network.Client} in case of
+        * server related errors.
+        * <pre>
+        *  <b>The buffer is expected to be at the correct position.</b>
+        * </pre>
+        * @param buf   The {@link ByteBuf} containing the serialized failure 
message.
+        * @return              The failure message.
+        */
+       public static Throwable deserializeServerFailure(final ByteBuf buf) 
throws IOException, ClassNotFoundException {
+               try (ByteBufInputStream bis = new ByteBufInputStream(buf);
+                               ObjectInputStream in = new 
ObjectInputStream(bis)) {
+                       return (Throwable) in.readObject();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageType.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageType.java
 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageType.java
new file mode 100644
index 0000000..562ce93
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageType.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network.messages;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Expected message types during the communication between
+ * {@link org.apache.flink.queryablestate.network.Client client} and
+ * {@link org.apache.flink.queryablestate.network.AbstractServerBase server}.
+ */
+@Internal
+public enum MessageType {
+
+       /** The message is a request. */
+       REQUEST,
+
+       /** The message is a successful response. */
+       REQUEST_RESULT,
+
+       /** The message indicates a protocol-related failure. */
+       REQUEST_FAILURE,
+
+       /** The message indicates a server failure. */
+       SERVER_FAILURE
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/RequestFailure.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/RequestFailure.java
 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/RequestFailure.java
new file mode 100644
index 0000000..106199f
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/RequestFailure.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network.messages;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * A message indicating a protocol-related error.
+ */
+@Internal
+public class RequestFailure {
+
+       /** ID of the request responding to. */
+       private final long requestId;
+
+       /** Failure cause. Not allowed to be a user type. */
+       private final Throwable cause;
+
+       /**
+        * Creates a failure response to a {@link MessageBody}.
+        *
+        * @param requestId ID for the request responding to
+        * @param cause     Failure cause (not allowed to be a user type)
+        */
+       public RequestFailure(long requestId, Throwable cause) {
+               this.requestId = requestId;
+               this.cause = cause;
+       }
+
+       /**
+        * Returns the request ID responding to.
+        *
+        * @return Request ID responding to
+        */
+       public long getRequestId() {
+               return requestId;
+       }
+
+       /**
+        * Returns the failure cause.
+        *
+        * @return Failure cause
+        */
+       public Throwable getCause() {
+               return cause;
+       }
+
+       @Override
+       public String toString() {
+               return "RequestFailure{" +
+                               "requestId=" + requestId +
+                               ", cause=" + cause +
+                               '}';
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/stats/AtomicKvStateRequestStats.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/stats/AtomicKvStateRequestStats.java
 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/stats/AtomicKvStateRequestStats.java
new file mode 100644
index 0000000..9ba5f84
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/stats/AtomicKvStateRequestStats.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network.stats;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Atomic {@link KvStateRequestStats} implementation.
+ */
+public class AtomicKvStateRequestStats implements KvStateRequestStats {
+
+       /**
+        * Number of active connections.
+        */
+       private final AtomicLong numConnections = new AtomicLong();
+
+       /**
+        * Total number of reported requests.
+        */
+       private final AtomicLong numRequests = new AtomicLong();
+
+       /**
+        * Total number of successful requests (<= reported requests).
+        */
+       private final AtomicLong numSuccessful = new AtomicLong();
+
+       /**
+        * Total duration of all successful requests.
+        */
+       private final AtomicLong successfulDuration = new AtomicLong();
+
+       /**
+        * Total number of failed requests (<= reported requests).
+        */
+       private final AtomicLong numFailed = new AtomicLong();
+
+       @Override
+       public void reportActiveConnection() {
+               numConnections.incrementAndGet();
+       }
+
+       @Override
+       public void reportInactiveConnection() {
+               numConnections.decrementAndGet();
+       }
+
+       @Override
+       public void reportRequest() {
+               numRequests.incrementAndGet();
+       }
+
+       @Override
+       public void reportSuccessfulRequest(long durationTotalMillis) {
+               numSuccessful.incrementAndGet();
+               successfulDuration.addAndGet(durationTotalMillis);
+       }
+
+       @Override
+       public void reportFailedRequest() {
+               numFailed.incrementAndGet();
+       }
+
+       public long getNumConnections() {
+               return numConnections.get();
+       }
+
+       public long getNumRequests() {
+               return numRequests.get();
+       }
+
+       public long getNumSuccessful() {
+               return numSuccessful.get();
+       }
+
+       public long getNumFailed() {
+               return numFailed.get();
+       }
+
+       @Override
+       public String toString() {
+               return "AtomicKvStateRequestStats{" +
+                               "numConnections=" + numConnections +
+                               ", numRequests=" + numRequests +
+                               ", numSuccessful=" + numSuccessful +
+                               ", numFailed=" + numFailed +
+                               '}';
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/stats/DisabledKvStateRequestStats.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/stats/DisabledKvStateRequestStats.java
 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/stats/DisabledKvStateRequestStats.java
new file mode 100644
index 0000000..b34ac3e
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/stats/DisabledKvStateRequestStats.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network.stats;
+
+/**
+ * Disabled {@link KvStateRequestStats} implementation.
+ */
+public class DisabledKvStateRequestStats implements KvStateRequestStats {
+
+       @Override
+       public void reportActiveConnection() {
+       }
+
+       @Override
+       public void reportInactiveConnection() {
+       }
+
+       @Override
+       public void reportRequest() {
+       }
+
+       @Override
+       public void reportSuccessfulRequest(long durationTotalMillis) {
+       }
+
+       @Override
+       public void reportFailedRequest() {
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/stats/KvStateRequestStats.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/stats/KvStateRequestStats.java
 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/stats/KvStateRequestStats.java
new file mode 100644
index 0000000..8e9edd8
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/stats/KvStateRequestStats.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network.stats;
+
+/**
+ * Simple statistics for monitoring the state server
+ * and the client proxy.
+ */
+public interface KvStateRequestStats {
+
+       /**
+        * Reports an active connection.
+        */
+       void reportActiveConnection();
+
+       /**
+        * Reports an inactive connection.
+        */
+       void reportInactiveConnection();
+
+       /**
+        * Reports an incoming request.
+        */
+       void reportRequest();
+
+       /**
+        * Reports a successfully handled request.
+        *
+        * @param durationTotalMillis Duration of the request (in milliseconds).
+        */
+       void reportSuccessfulRequest(long durationTotalMillis);
+
+       /**
+        * Reports a failure during a request.
+        */
+       void reportFailedRequest();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/VoidNamespaceTypeInfoTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/VoidNamespaceTypeInfoTest.java
 
b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/VoidNamespaceTypeInfoTest.java
new file mode 100644
index 0000000..ca11a32
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/VoidNamespaceTypeInfoTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client;
+
+import org.apache.flink.api.common.typeutils.TypeInformationTestBase;
+
+/**
+ * Test for {@link VoidNamespaceTypeInfo}.
+ */
+public class VoidNamespaceTypeInfoTest extends 
TypeInformationTestBase<VoidNamespaceTypeInfo> {
+
+       @Override
+       protected VoidNamespaceTypeInfo[] getTestData() {
+               return new VoidNamespaceTypeInfo[] { 
VoidNamespaceTypeInfo.INSTANCE };
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingStateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingStateTest.java
 
b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingStateTest.java
new file mode 100644
index 0000000..ebbc896
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingStateTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the {@link ImmutableAggregatingStateTest}.
+ */
+public class ImmutableAggregatingStateTest {
+
+       private final AggregatingStateDescriptor<Long, String, String> 
aggrStateDesc =
+                       new AggregatingStateDescriptor<>(
+                                       "test",
+                                       new SumAggr(),
+                                       String.class);
+
+       private ImmutableAggregatingState<Long, String> aggrState;
+
+       @Before
+       public void setUp() throws Exception {
+               if (!aggrStateDesc.isSerializerInitialized()) {
+                       aggrStateDesc.initializeSerializerUnlessSet(new 
ExecutionConfig());
+               }
+
+               final String initValue = "42";
+
+               ByteArrayOutputStream out = new ByteArrayOutputStream();
+               aggrStateDesc.getSerializer().serialize(initValue, new 
DataOutputViewStreamWrapper(out));
+
+               aggrState = ImmutableAggregatingState.createState(
+                               aggrStateDesc,
+                               out.toByteArray()
+               );
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testUpdate() {
+               String value = aggrState.get();
+               assertEquals("42", value);
+
+               aggrState.add(54L);
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testClear() {
+               String value = aggrState.get();
+               assertEquals("42", value);
+
+               aggrState.clear();
+       }
+
+       /**
+        * Test {@link AggregateFunction} concatenating the already stored 
string with the long passed as argument.
+        */
+       private static class SumAggr implements AggregateFunction<Long, String, 
String> {
+
+               private static final long serialVersionUID = 
-6249227626701264599L;
+
+               @Override
+               public String createAccumulator() {
+                       return "";
+               }
+
+               @Override
+               public String add(Long value, String accumulator) {
+                       accumulator += ", " + value;
+                       return accumulator;
+               }
+
+               @Override
+               public String getResult(String accumulator) {
+                       return accumulator;
+               }
+
+               @Override
+               public String merge(String a, String b) {
+                       return a + ", " + b;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingStateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingStateTest.java
 
b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingStateTest.java
new file mode 100644
index 0000000..9e8dfc9
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingStateTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the {@link ImmutableFoldingState}.
+ */
+public class ImmutableFoldingStateTest {
+
+       private final FoldingStateDescriptor<Long, String> foldingStateDesc =
+                       new FoldingStateDescriptor<>(
+                                       "test",
+                                       "0",
+                                       new SumFold(),
+                                       StringSerializer.INSTANCE);
+
+       private ImmutableFoldingState<Long, String> foldingState;
+
+       @Before
+       public void setUp() throws Exception {
+               if (!foldingStateDesc.isSerializerInitialized()) {
+                       foldingStateDesc.initializeSerializerUnlessSet(new 
ExecutionConfig());
+               }
+
+               ByteArrayOutputStream out = new ByteArrayOutputStream();
+               StringSerializer.INSTANCE.serialize("42", new 
DataOutputViewStreamWrapper(out));
+
+               foldingState = ImmutableFoldingState.createState(
+                               foldingStateDesc,
+                               out.toByteArray()
+               );
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testUpdate() {
+               String value = foldingState.get();
+               assertEquals("42", value);
+
+               foldingState.add(54L);
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testClear() {
+               String value = foldingState.get();
+               assertEquals("42", value);
+
+               foldingState.clear();
+       }
+
+       /**
+        * Test {@link FoldFunction} concatenating the already stored string 
with the long passed as argument.
+        */
+       private static class SumFold implements FoldFunction<Long, String> {
+
+               private static final long serialVersionUID = 
-6249227626701264599L;
+
+               @Override
+               public String fold(String accumulator, Long value) throws 
Exception {
+                       long acc = Long.valueOf(accumulator);
+                       acc += value;
+                       return Long.toString(acc);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableListStateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableListStateTest.java
 
b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableListStateTest.java
new file mode 100644
index 0000000..a78ed1f
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableListStateTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the {@link ImmutableListState}.
+ */
+public class ImmutableListStateTest {
+
+       private final ListStateDescriptor<Long> listStateDesc =
+                       new ListStateDescriptor<>("test", 
BasicTypeInfo.LONG_TYPE_INFO);
+
+       private ImmutableListState<Long> listState;
+
+       @Before
+       public void setUp() throws Exception {
+               if (!listStateDesc.isSerializerInitialized()) {
+                       listStateDesc.initializeSerializerUnlessSet(new 
ExecutionConfig());
+               }
+
+               List<Long> init = new ArrayList<>();
+               init.add(42L);
+
+               byte[] serInit = serializeInitValue(init);
+               listState = ImmutableListState.createState(listStateDesc, 
serInit);
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testUpdate() {
+               List<Long> list = getStateContents();
+               assertEquals(1L, list.size());
+
+               long element = list.get(0);
+               assertEquals(42L, element);
+
+               listState.add(54L);
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testClear() {
+               List<Long> list = getStateContents();
+               assertEquals(1L, list.size());
+
+               long element = list.get(0);
+               assertEquals(42L, element);
+
+               listState.clear();
+       }
+
+       /**
+        * Copied from HeapListState.getSerializedValue(Object, Object).
+        */
+       private byte[] serializeInitValue(List<Long> toSerialize) throws 
IOException {
+               TypeSerializer<Long> serializer = 
listStateDesc.getElementSerializer();
+
+               ByteArrayOutputStream baos = new ByteArrayOutputStream();
+               DataOutputViewStreamWrapper view = new 
DataOutputViewStreamWrapper(baos);
+
+               // write the same as RocksDB writes lists, with one ',' 
separator
+               for (int i = 0; i < toSerialize.size(); i++) {
+                       serializer.serialize(toSerialize.get(i), view);
+                       if (i < toSerialize.size() - 1) {
+                               view.writeByte(',');
+                       }
+               }
+               view.flush();
+
+               return baos.toByteArray();
+       }
+
+       private List<Long> getStateContents() {
+               List<Long> list = new ArrayList<>();
+               for (Long elem: listState.get()) {
+                       list.add(elem);
+               }
+               return list;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java
 
b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java
new file mode 100644
index 0000000..ffeabae
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import 
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests the {@link ImmutableMapState}.
+ */
+public class ImmutableMapStateTest {
+
+       private final MapStateDescriptor<Long, Long> mapStateDesc =
+                       new MapStateDescriptor<>(
+                                       "test",
+                                       BasicTypeInfo.LONG_TYPE_INFO,
+                                       BasicTypeInfo.LONG_TYPE_INFO);
+
+       private ImmutableMapState<Long, Long> mapState;
+
+       @Before
+       public void setUp() throws Exception {
+               if (!mapStateDesc.isSerializerInitialized()) {
+                       mapStateDesc.initializeSerializerUnlessSet(new 
ExecutionConfig());
+               }
+
+               Map<Long, Long> initMap = new HashMap<>();
+               initMap.put(1L, 5L);
+               initMap.put(2L, 5L);
+
+               byte[] initSer = KvStateSerializer.serializeMap(
+                               initMap.entrySet(),
+                               
BasicTypeInfo.LONG_TYPE_INFO.createSerializer(new ExecutionConfig()),
+                               
BasicTypeInfo.LONG_TYPE_INFO.createSerializer(new ExecutionConfig()));
+
+               mapState = ImmutableMapState.createState(mapStateDesc, initSer);
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testPut() {
+               assertTrue(mapState.contains(1L));
+               long value = mapState.get(1L);
+               assertEquals(5L, value);
+
+               assertTrue(mapState.contains(2L));
+               value = mapState.get(2L);
+               assertEquals(5L, value);
+
+               mapState.put(2L, 54L);
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testPutAll() {
+               assertTrue(mapState.contains(1L));
+               long value = mapState.get(1L);
+               assertEquals(5L, value);
+
+               assertTrue(mapState.contains(2L));
+               value = mapState.get(2L);
+               assertEquals(5L, value);
+
+               Map<Long, Long> nMap = new HashMap<>();
+               nMap.put(1L, 7L);
+               nMap.put(2L, 7L);
+
+               mapState.putAll(nMap);
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testUpdate() {
+               assertTrue(mapState.contains(1L));
+               long value = mapState.get(1L);
+               assertEquals(5L, value);
+
+               assertTrue(mapState.contains(2L));
+               value = mapState.get(2L);
+               assertEquals(5L, value);
+
+               mapState.put(2L, 54L);
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testIterator() {
+               assertTrue(mapState.contains(1L));
+               long value = mapState.get(1L);
+               assertEquals(5L, value);
+
+               assertTrue(mapState.contains(2L));
+               value = mapState.get(2L);
+               assertEquals(5L, value);
+
+               Iterator<Map.Entry<Long, Long>> iterator = mapState.iterator();
+               while (iterator.hasNext()) {
+                       iterator.remove();
+               }
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testIterable() {
+               assertTrue(mapState.contains(1L));
+               long value = mapState.get(1L);
+               assertEquals(5L, value);
+
+               assertTrue(mapState.contains(2L));
+               value = mapState.get(2L);
+               assertEquals(5L, value);
+
+               Iterable<Map.Entry<Long, Long>> iterable = mapState.entries();
+               Iterator<Map.Entry<Long, Long>> iterator = iterable.iterator();
+               while (iterator.hasNext()) {
+                       assertEquals(5L, (long) iterator.next().getValue());
+                       iterator.remove();
+               }
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testKeys() {
+               assertTrue(mapState.contains(1L));
+               long value = mapState.get(1L);
+               assertEquals(5L, value);
+
+               assertTrue(mapState.contains(2L));
+               value = mapState.get(2L);
+               assertEquals(5L, value);
+
+               Iterator<Long> iterator = mapState.keys().iterator();
+               while (iterator.hasNext()) {
+                       iterator.remove();
+               }
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testValues() {
+               assertTrue(mapState.contains(1L));
+               long value = mapState.get(1L);
+               assertEquals(5L, value);
+
+               assertTrue(mapState.contains(2L));
+               value = mapState.get(2L);
+               assertEquals(5L, value);
+
+               Iterator<Long> iterator = mapState.values().iterator();
+               while (iterator.hasNext()) {
+                       iterator.remove();
+               }
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testClear() {
+               assertTrue(mapState.contains(1L));
+               long value = mapState.get(1L);
+               assertEquals(5L, value);
+
+               assertTrue(mapState.contains(2L));
+               value = mapState.get(2L);
+               assertEquals(5L, value);
+
+               mapState.clear();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableReducingStateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableReducingStateTest.java
 
b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableReducingStateTest.java
new file mode 100644
index 0000000..9694f55
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableReducingStateTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the {@link ImmutableReducingState}.
+ */
+public class ImmutableReducingStateTest {
+
+       private final ReducingStateDescriptor<Long> reducingStateDesc =
+                       new ReducingStateDescriptor<>("test", new SumReduce(), 
BasicTypeInfo.LONG_TYPE_INFO);
+
+       private ImmutableReducingState<Long> reduceState;
+
+       @Before
+       public void setUp() throws Exception {
+               if (!reducingStateDesc.isSerializerInitialized()) {
+                       reducingStateDesc.initializeSerializerUnlessSet(new 
ExecutionConfig());
+               }
+
+               reduceState = ImmutableReducingState.createState(
+                               reducingStateDesc,
+                               
ByteBuffer.allocate(Long.BYTES).putLong(42L).array()
+               );
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testUpdate() {
+               long value = reduceState.get();
+               assertEquals(42L, value);
+
+               reduceState.add(54L);
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testClear() {
+               long value = reduceState.get();
+               assertEquals(42L, value);
+
+               reduceState.clear();
+       }
+
+       /**
+        * Test {@link ReduceFunction} summing up its two arguments.
+        */
+       private static class SumReduce implements ReduceFunction<Long> {
+
+               private static final long serialVersionUID = 
6041237513913189144L;
+
+               @Override
+               public Long reduce(Long value1, Long value2) throws Exception {
+                       return value1 + value2;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableValueStateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableValueStateTest.java
 
b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableValueStateTest.java
new file mode 100644
index 0000000..a0da43d
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableValueStateTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the {@link ImmutableValueState}.
+ */
+public class ImmutableValueStateTest {
+
+       private final ValueStateDescriptor<Long> valueStateDesc =
+                       new ValueStateDescriptor<>("test", 
BasicTypeInfo.LONG_TYPE_INFO);
+
+       private ImmutableValueState<Long> valueState;
+
+       @Before
+       public void setUp() throws Exception {
+               if (!valueStateDesc.isSerializerInitialized()) {
+                       valueStateDesc.initializeSerializerUnlessSet(new 
ExecutionConfig());
+               }
+
+               valueState = ImmutableValueState.createState(
+                               valueStateDesc,
+                               
ByteBuffer.allocate(Long.BYTES).putLong(42L).array()
+               );
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testUpdate() {
+               long value = valueState.value();
+               assertEquals(42L, value);
+
+               valueState.update(54L);
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testClear() {
+               long value = valueState.value();
+               assertEquals(42L, value);
+
+               valueState.clear();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-client-java/src/test/resources/log4j-test.properties
 
b/flink-queryable-state/flink-queryable-state-client-java/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..10792cd
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-client-java/src/test/resources/log4j-test.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=OFF, testlogger
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR
+log4j.logger.org.apache.zookeeper=OFF

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/pom.xml
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/pom.xml 
b/flink-queryable-state/flink-queryable-state-java/pom.xml
deleted file mode 100644
index e60c6f3..0000000
--- a/flink-queryable-state/flink-queryable-state-java/pom.xml
+++ /dev/null
@@ -1,137 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0";
-                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
-
-       <modelVersion>4.0.0</modelVersion>
-
-       <parent>
-               <groupId>org.apache.flink</groupId>
-               <artifactId>flink-queryable-state</artifactId>
-               <version>1.4-SNAPSHOT</version>
-               <relativePath>..</relativePath>
-       </parent>
-
-       
<artifactId>flink-queryable-state-java_${scala.binary.version}</artifactId>
-       <name>flink-queryable-state-java</name>
-       <packaging>jar</packaging>
-
-       <dependencies>
-
-               <!-- core dependencies -->
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-core</artifactId>
-                       <version>${project.version}</version>
-                       <scope>provided</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
-                       <version>${project.version}</version>
-                       <scope>provided</scope>
-               </dependency>
-
-               <!-- test dependencies -->
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
-                       <version>${project.version}</version>
-                       <type>test-jar</type>
-                       <scope>test</scope>
-               </dependency>
-
-          <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
-                       <version>${project.version}</version>
-                       <type>test-jar</type>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-               </dependency>
-
-               <!-- ===================================================
-                                                               Testing
-                       =================================================== -->
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-test-utils-junit</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.curator</groupId>
-                       <artifactId>curator-test</artifactId>
-                       <version>${curator.version}</version>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
-                       <version>${project.version}</version>
-                       <type>test-jar</type>
-                       <scope>test</scope>
-               </dependency>
-
-       </dependencies>
-
-       <build>
-               <plugins>
-                       <plugin>
-                               <groupId>org.apache.maven.plugins</groupId>
-                               <artifactId>maven-jar-plugin</artifactId>
-                               <executions>
-                                       <execution>
-                                               <goals>
-                                                       <goal>test-jar</goal>
-                                               </goals>
-                                       </execution>
-                               </executions>
-                       </plugin>
-               </plugins>
-       </build>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownJobManagerException.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownJobManagerException.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownJobManagerException.java
deleted file mode 100644
index fa2604b..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownJobManagerException.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate;
-
-import org.apache.flink.annotation.Internal;
-
-/**
- * Exception to fail Future if the Task Manager on which the
- * {@link org.apache.flink.runtime.query.KvStateClientProxy}
- * is running on, does not know the active Job Manager.
- */
-@Internal
-public class UnknownJobManagerException extends Exception {
-
-       private static final long serialVersionUID = 9092442511708951209L;
-
-       public UnknownJobManagerException() {
-               super("Unknown JobManager. Either the JobManager has not 
registered yet or has lost leadership.");
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKeyOrNamespaceException.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKeyOrNamespaceException.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKeyOrNamespaceException.java
deleted file mode 100644
index c497a72..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKeyOrNamespaceException.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.queryablestate.network.BadRequestException;
-
-/**
- * Thrown if the KvState does not hold any state for the given key or 
namespace.
- */
-@Internal
-public class UnknownKeyOrNamespaceException extends BadRequestException {
-
-       private static final long serialVersionUID = 1L;
-
-       /**
-        * Creates the exception.
-        * @param serverName the name of the server that threw the exception.
-        */
-       public UnknownKeyOrNamespaceException(String serverName) {
-               super(serverName, "No state for the specified key/namespace.");
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateIdException.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateIdException.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateIdException.java
deleted file mode 100644
index 59ba081..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateIdException.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.queryablestate.network.BadRequestException;
-import org.apache.flink.runtime.query.KvStateID;
-import org.apache.flink.util.Preconditions;
-
-/**
- * Thrown if no KvState with the given ID cannot found by the server handler.
- */
-@Internal
-public class UnknownKvStateIdException extends BadRequestException {
-
-       private static final long serialVersionUID = 1L;
-
-       /**
-        * Creates the exception.
-        * @param serverName the name of the server that threw the exception.
-        * @param kvStateId the state id for which no state was found.
-        */
-       public UnknownKvStateIdException(String serverName, KvStateID 
kvStateId) {
-               super(serverName, "No registered state with ID " + 
Preconditions.checkNotNull(kvStateId) + '.');
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateKeyGroupLocationException.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateKeyGroupLocationException.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateKeyGroupLocationException.java
deleted file mode 100644
index 0d6588a..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateKeyGroupLocationException.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.queryablestate.network.BadRequestException;
-import org.apache.flink.runtime.query.KvStateLocation;
-
-/**
- * Exception thrown if there is no location information available for the given
- * key group in a {@link KvStateLocation} instance.
- */
-@Internal
-public class UnknownKvStateKeyGroupLocationException extends 
BadRequestException {
-
-       private static final long serialVersionUID = 1L;
-
-       /**
-        * Creates the exception.
-        * @param serverName the name of the server that threw the exception.
-        */
-       public UnknownKvStateKeyGroupLocationException(String serverName) {
-               super(serverName, "Unknown key-group location.");
-       }
-}

Reply via email to