http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
new file mode 100644
index 0000000..e6d59de
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.queryablestate.network.messages.MessageBody;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
+import org.apache.flink.runtime.query.KvStateServerAddress;
+import org.apache.flink.runtime.query.netty.KvStateRequestStats;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption;
+import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
+
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayDeque;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * The base class for every client in the queryable state module.
+ * It is using pure netty to send and receive messages of type {@link 
MessageBody}.
+ *
+ * @param <REQ> the type of request the client will send.
+ * @param <RESP> the type of response the client expects to receive.
+ */
+@Internal
+public class Client<REQ extends MessageBody, RESP extends MessageBody> {
+
+       /** The name of the client. Used for logging and stack traces.*/
+       private final String clientName;
+
+       /** Netty's Bootstrap. */
+       private final Bootstrap bootstrap;
+
+       /** The serializer to be used for (de-)serializing messages. */
+       private final MessageSerializer<REQ, RESP> messageSerializer;
+
+       /** Statistics tracker. */
+       private final KvStateRequestStats stats;
+
+       /** Established connections. */
+       private final Map<KvStateServerAddress, EstablishedConnection> 
establishedConnections = new ConcurrentHashMap<>();
+
+       /** Pending connections. */
+       private final Map<KvStateServerAddress, PendingConnection> 
pendingConnections = new ConcurrentHashMap<>();
+
+       /** Atomic shut down flag. */
+       private final AtomicBoolean shutDown = new AtomicBoolean();
+
+       /**
+        * Creates a client with the specified number of event loop threads.
+        *
+        * @param clientName the name of the client.
+        * @param numEventLoopThreads number of event loop threads (minimum 1).
+        * @param serializer the serializer used to (de-)serialize messages.
+        * @param stats the statistics collector.
+        */
+       public Client(
+                       final String clientName,
+                       final int numEventLoopThreads,
+                       final MessageSerializer<REQ, RESP> serializer,
+                       final KvStateRequestStats stats) {
+
+               Preconditions.checkArgument(numEventLoopThreads >= 1,
+                               "Non-positive number of event loop threads.");
+
+               this.clientName = Preconditions.checkNotNull(clientName);
+               this.messageSerializer = Preconditions.checkNotNull(serializer);
+               this.stats = Preconditions.checkNotNull(stats);
+
+               final ThreadFactory threadFactory = new ThreadFactoryBuilder()
+                               .setDaemon(true)
+                               .setNameFormat("Flink " + clientName + " Event 
Loop Thread %d")
+                               .build();
+
+               final EventLoopGroup nioGroup = new 
NioEventLoopGroup(numEventLoopThreads, threadFactory);
+               final ByteBufAllocator bufferPool = new 
NettyBufferPool(numEventLoopThreads);
+
+               this.bootstrap = new Bootstrap()
+                               .group(nioGroup)
+                               .channel(NioSocketChannel.class)
+                               .option(ChannelOption.ALLOCATOR, bufferPool)
+                               .handler(new 
ChannelInitializer<SocketChannel>() {
+                                       @Override
+                                       protected void 
initChannel(SocketChannel channel) throws Exception {
+                                               channel.pipeline()
+                                                               .addLast(new 
LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))
+                                                               .addLast(new 
ChunkedWriteHandler());
+                                       }
+                               });
+       }
+
+       public String getClientName() {
+               return clientName;
+       }
+
+       public CompletableFuture<RESP> sendRequest(final KvStateServerAddress 
serverAddress, final REQ request) {
+               if (shutDown.get()) {
+                       return FutureUtils.getFailedFuture(new 
IllegalStateException("Shut down"));
+               }
+
+               EstablishedConnection connection = 
establishedConnections.get(serverAddress);
+               if (connection != null) {
+                       return connection.sendRequest(request);
+               } else {
+                       PendingConnection pendingConnection = 
pendingConnections.get(serverAddress);
+                       if (pendingConnection != null) {
+                               // There was a race, use the existing pending 
connection.
+                               return pendingConnection.sendRequest(request);
+                       } else {
+                               // We try to connect to the server.
+                               PendingConnection pending = new 
PendingConnection(serverAddress, messageSerializer);
+                               PendingConnection previous = 
pendingConnections.putIfAbsent(serverAddress, pending);
+
+                               if (previous == null) {
+                                       // OK, we are responsible to connect.
+                                       
bootstrap.connect(serverAddress.getHost(), 
serverAddress.getPort()).addListener(pending);
+                                       return pending.sendRequest(request);
+                               } else {
+                                       // There was a race, use the existing 
pending connection.
+                                       return previous.sendRequest(request);
+                               }
+                       }
+               }
+       }
+
+       /**
+        * Shuts down the client and closes all connections.
+        *
+        * <p>After a call to this method, all returned futures will be failed.
+        */
+       public void shutdown() {
+               if (shutDown.compareAndSet(false, true)) {
+                       for (Map.Entry<KvStateServerAddress, 
EstablishedConnection> conn : establishedConnections.entrySet()) {
+                               if 
(establishedConnections.remove(conn.getKey(), conn.getValue())) {
+                                       conn.getValue().close();
+                               }
+                       }
+
+                       for (Map.Entry<KvStateServerAddress, PendingConnection> 
conn : pendingConnections.entrySet()) {
+                               if (pendingConnections.remove(conn.getKey()) != 
null) {
+                                       conn.getValue().close();
+                               }
+                       }
+
+                       if (bootstrap != null) {
+                               EventLoopGroup group = bootstrap.group();
+                               if (group != null) {
+                                       group.shutdownGracefully(0L, 10L, 
TimeUnit.SECONDS);
+                               }
+                       }
+               }
+       }
+
+       /**
+        * A pending connection that is in the process of connecting.
+        */
+       private class PendingConnection implements ChannelFutureListener {
+
+               /** Lock to guard the connect call, channel hand in, etc. */
+               private final Object connectLock = new Object();
+
+               /** Address of the server we are connecting to. */
+               private final KvStateServerAddress serverAddress;
+
+               private final MessageSerializer<REQ, RESP> serializer;
+
+               /** Queue of requests while connecting. */
+               private final ArrayDeque<PendingRequest> queuedRequests = new 
ArrayDeque<>();
+
+               /** The established connection after the connect succeeds. */
+               private EstablishedConnection established;
+
+               /** Closed flag. */
+               private boolean closed;
+
+               /** Failure cause if something goes wrong. */
+               private Throwable failureCause;
+
+               /**
+                * Creates a pending connection to the given server.
+                *
+                * @param serverAddress Address of the server to connect to.
+                */
+               private PendingConnection(
+                               final KvStateServerAddress serverAddress,
+                               final MessageSerializer<REQ, RESP> serializer) {
+                       this.serverAddress = serverAddress;
+                       this.serializer = serializer;
+               }
+
+               @Override
+               public void operationComplete(ChannelFuture future) throws 
Exception {
+                       if (future.isSuccess()) {
+                               handInChannel(future.channel());
+                       } else {
+                               close(future.cause());
+                       }
+               }
+
+               /**
+                * Returns a future holding the serialized request result.
+                *
+                * <p>If the channel has been established, forward the call to 
the
+                * established channel, otherwise queue it for when the channel 
is
+                * handed in.
+                *
+                * @param request the request to be sent.
+                * @return Future holding the serialized result
+                */
+               public CompletableFuture<RESP> sendRequest(REQ request) {
+                       synchronized (connectLock) {
+                               if (failureCause != null) {
+                                       return 
FutureUtils.getFailedFuture(failureCause);
+                               } else if (closed) {
+                                       return FutureUtils.getFailedFuture(new 
ClosedChannelException());
+                               } else {
+                                       if (established != null) {
+                                               return 
established.sendRequest(request);
+                                       } else {
+                                               // Queue this and handle when 
connected
+                                               final PendingRequest pending = 
new PendingRequest(request);
+                                               queuedRequests.add(pending);
+                                               return pending;
+                                       }
+                               }
+                       }
+               }
+
+               /**
+                * Hands in a channel after a successful connection.
+                *
+                * @param channel Channel to hand in
+                */
+               private void handInChannel(Channel channel) {
+                       synchronized (connectLock) {
+                               if (closed || failureCause != null) {
+                                       // Close the channel and we are done. 
Any queued requests
+                                       // are removed on the close/failure 
call and after that no
+                                       // new ones can be enqueued.
+                                       channel.close();
+                               } else {
+                                       established = new 
EstablishedConnection(serverAddress, serializer, channel);
+
+                                       while (!queuedRequests.isEmpty()) {
+                                               final PendingRequest pending = 
queuedRequests.poll();
+
+                                               
established.sendRequest(pending.request)
+                                                               
.thenAccept(resp -> pending.complete(resp))
+                                                               
.exceptionally(throwable -> {
+                                                                       
pending.completeExceptionally(throwable);
+                                                                       return 
null;
+                                               });
+                                       }
+
+                                       // Publish the channel for the general 
public
+                                       
establishedConnections.put(serverAddress, established);
+                                       
pendingConnections.remove(serverAddress);
+
+                                       // Check shut down for possible race 
with shut down. We
+                                       // don't want any lingering connections 
after shut down,
+                                       // which can happen if we don't check 
this here.
+                                       if (shutDown.get()) {
+                                               if 
(establishedConnections.remove(serverAddress, established)) {
+                                                       established.close();
+                                               }
+                                       }
+                               }
+                       }
+               }
+
+               /**
+                * Close the connecting channel with a ClosedChannelException.
+                */
+               private void close() {
+                       close(new ClosedChannelException());
+               }
+
+               /**
+                * Close the connecting channel with an Exception (can be 
{@code null})
+                * or forward to the established channel.
+                */
+               private void close(Throwable cause) {
+                       synchronized (connectLock) {
+                               if (!closed) {
+                                       if (failureCause == null) {
+                                               failureCause = cause;
+                                       }
+
+                                       if (established != null) {
+                                               established.close();
+                                       } else {
+                                               PendingRequest pending;
+                                               while ((pending = 
queuedRequests.poll()) != null) {
+                                                       
pending.completeExceptionally(cause);
+                                               }
+                                       }
+                                       closed = true;
+                               }
+                       }
+               }
+
+               @Override
+               public String toString() {
+                       synchronized (connectLock) {
+                               return "PendingConnection{" +
+                                               "serverAddress=" + 
serverAddress +
+                                               ", queuedRequests=" + 
queuedRequests.size() +
+                                               ", established=" + (established 
!= null) +
+                                               ", closed=" + closed +
+                                               '}';
+                       }
+               }
+
+               /**
+                * A pending request queued while the channel is connecting.
+                */
+               private final class PendingRequest extends 
CompletableFuture<RESP> {
+
+                       private final REQ request;
+
+                       private PendingRequest(REQ request) {
+                               this.request = request;
+                       }
+               }
+       }
+
+       /**
+        * An established connection that wraps the actual channel instance and 
is
+        * registered at the {@link ClientHandler} for callbacks.
+        */
+       private class EstablishedConnection implements 
ClientHandlerCallback<RESP> {
+
+               /** Address of the server we are connected to. */
+               private final KvStateServerAddress serverAddress;
+
+               /** The actual TCP channel. */
+               private final Channel channel;
+
+               /** Pending requests keyed by request ID. */
+               private final ConcurrentHashMap<Long, 
TimestampedCompletableFuture> pendingRequests = new ConcurrentHashMap<>();
+
+               /** Current request number used to assign unique request IDs. */
+               private final AtomicLong requestCount = new AtomicLong();
+
+               /** Reference to a failure that was reported by the channel. */
+               private final AtomicReference<Throwable> failureCause = new 
AtomicReference<>();
+
+               /**
+                * Creates an established connection with the given channel.
+                *
+                * @param serverAddress Address of the server connected to
+                * @param channel The actual TCP channel
+                */
+               EstablishedConnection(
+                               final KvStateServerAddress serverAddress,
+                               final MessageSerializer<REQ, RESP> serializer,
+                               final Channel channel) {
+
+                       this.serverAddress = 
Preconditions.checkNotNull(serverAddress);
+                       this.channel = Preconditions.checkNotNull(channel);
+
+                       // Add the client handler with the callback
+                       channel.pipeline().addLast(
+                                       getClientName() + " Handler",
+                                       new ClientHandler<>(clientName, 
serializer, this)
+                       );
+
+                       stats.reportActiveConnection();
+               }
+
+               /**
+                * Close the channel with a ClosedChannelException.
+                */
+               void close() {
+                       close(new ClosedChannelException());
+               }
+
+               /**
+                * Close the channel with a cause.
+                *
+                * @param cause The cause to close the channel with.
+                * @return Channel close future
+                */
+               private boolean close(Throwable cause) {
+                       if (failureCause.compareAndSet(null, cause)) {
+                               channel.close();
+                               stats.reportInactiveConnection();
+
+                               for (long requestId : pendingRequests.keySet()) 
{
+                                       TimestampedCompletableFuture pending = 
pendingRequests.remove(requestId);
+                                       if (pending != null && 
pending.completeExceptionally(cause)) {
+                                               stats.reportFailedRequest();
+                                       }
+                               }
+                               return true;
+                       }
+                       return false;
+               }
+
+               /**
+                * Returns a future holding the serialized request result.
+                * @param request the request to be sent.
+                * @return Future holding the serialized result
+                */
+               CompletableFuture<RESP> sendRequest(REQ request) {
+                       TimestampedCompletableFuture requestPromiseTs =
+                                       new 
TimestampedCompletableFuture(System.nanoTime());
+                       try {
+                               final long requestId = 
requestCount.getAndIncrement();
+                               pendingRequests.put(requestId, 
requestPromiseTs);
+
+                               stats.reportRequest();
+
+                               ByteBuf buf = 
MessageSerializer.serializeRequest(channel.alloc(), requestId, request);
+
+                               
channel.writeAndFlush(buf).addListener((ChannelFutureListener) future -> {
+                                       if (!future.isSuccess()) {
+                                               // Fail promise if not failed 
to write
+                                               TimestampedCompletableFuture 
pending = pendingRequests.remove(requestId);
+                                               if (pending != null && 
pending.completeExceptionally(future.cause())) {
+                                                       
stats.reportFailedRequest();
+                                               }
+                                       }
+                               });
+
+                               // Check failure for possible race. We don't 
want any lingering
+                               // promises after a failure, which can happen 
if we don't check
+                               // this here. Note that close is treated as a 
failure as well.
+                               Throwable failure = failureCause.get();
+                               if (failure != null) {
+                                       // Remove from pending requests to 
guard against concurrent
+                                       // removal and to make sure that we 
only count it once as failed.
+                                       TimestampedCompletableFuture pending = 
pendingRequests.remove(requestId);
+                                       if (pending != null && 
pending.completeExceptionally(failure)) {
+                                               stats.reportFailedRequest();
+                                       }
+                               }
+                       } catch (Throwable t) {
+                               requestPromiseTs.completeExceptionally(t);
+                       }
+
+                       return requestPromiseTs;
+               }
+
+               @Override
+               public void onRequestResult(long requestId, RESP response) {
+                       TimestampedCompletableFuture pending = 
pendingRequests.remove(requestId);
+                       if (pending != null && pending.complete(response)) {
+                               long durationMillis = (System.nanoTime() - 
pending.getTimestamp()) / 1_000_000L;
+                               stats.reportSuccessfulRequest(durationMillis);
+                       }
+               }
+
+               @Override
+               public void onRequestFailure(long requestId, Throwable cause) {
+                       TimestampedCompletableFuture pending = 
pendingRequests.remove(requestId);
+                       if (pending != null && 
pending.completeExceptionally(cause)) {
+                               stats.reportFailedRequest();
+                       }
+               }
+
+               @Override
+               public void onFailure(Throwable cause) {
+                       if (close(cause)) {
+                               // Remove from established channels, otherwise 
future
+                               // requests will be handled by this failed 
channel.
+                               establishedConnections.remove(serverAddress, 
this);
+                       }
+               }
+
+               @Override
+               public String toString() {
+                       return "EstablishedConnection{" +
+                                       "serverAddress=" + serverAddress +
+                                       ", channel=" + channel +
+                                       ", pendingRequests=" + 
pendingRequests.size() +
+                                       ", requestCount=" + requestCount +
+                                       ", failureCause=" + failureCause +
+                                       '}';
+               }
+
+               /**
+                * Pair of promise and a timestamp.
+                */
+               private class TimestampedCompletableFuture extends 
CompletableFuture<RESP> {
+
+                       private final long timestampInNanos;
+
+                       TimestampedCompletableFuture(long timestampInNanos) {
+                               this.timestampInNanos = timestampInNanos;
+                       }
+
+                       public long getTimestamp() {
+                               return timestampInNanos;
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandler.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandler.java
new file mode 100644
index 0000000..fc9b1d4
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandler.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.queryablestate.network.messages.MessageBody;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.queryablestate.network.messages.MessageType;
+import org.apache.flink.queryablestate.network.messages.RequestFailure;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.channels.ClosedChannelException;
+
+/**
+ * The handler used by a {@link Client} to handling incoming messages.
+ *
+ * @param <REQ> the type of request the client will send.
+ * @param <RESP> the type of response the client expects to receive.
+ */
+@Internal
+public class ClientHandler<REQ extends MessageBody, RESP extends MessageBody> 
extends ChannelInboundHandlerAdapter {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ClientHandler.class);
+
+       private final String clientName;
+
+       private final MessageSerializer<REQ, RESP> serializer;
+
+       private final ClientHandlerCallback<RESP> callback;
+
+       /**
+        * Creates a handler with the callback.
+        *
+        * @param clientName the name of the client.
+        * @param serializer the serializer used to (de-)serialize messages.
+        * @param callback Callback for responses.
+        */
+       public ClientHandler(
+                       final String clientName,
+                       final MessageSerializer<REQ, RESP> serializer,
+                       final ClientHandlerCallback<RESP> callback) {
+
+               this.clientName = Preconditions.checkNotNull(clientName);
+               this.serializer = Preconditions.checkNotNull(serializer);
+               this.callback = Preconditions.checkNotNull(callback);
+       }
+
+       @Override
+       public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
+               try {
+                       ByteBuf buf = (ByteBuf) msg;
+                       MessageType msgType = 
MessageSerializer.deserializeHeader(buf);
+
+                       if (msgType == MessageType.REQUEST_RESULT) {
+                               long requestId = 
MessageSerializer.getRequestId(buf);
+                               RESP result = 
serializer.deserializeResponse(buf);
+                               callback.onRequestResult(requestId, result);
+                       } else if (msgType == MessageType.REQUEST_FAILURE) {
+                               RequestFailure failure = 
MessageSerializer.deserializeRequestFailure(buf);
+                               
callback.onRequestFailure(failure.getRequestId(), failure.getCause());
+                       } else if (msgType == MessageType.SERVER_FAILURE) {
+                               throw 
MessageSerializer.deserializeServerFailure(buf);
+                       } else {
+                               throw new IllegalStateException("Unexpected 
response type '" + msgType + "'");
+                       }
+               } catch (Throwable t1) {
+                       try {
+                               callback.onFailure(t1);
+                       } catch (Throwable t2) {
+                               LOG.error("Failed to notify callback about 
failure", t2);
+                       }
+               } finally {
+                       ReferenceCountUtil.release(msg);
+               }
+       }
+
+       @Override
+       public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
throws Exception {
+               try {
+                       callback.onFailure(cause);
+               } catch (Throwable t) {
+                       LOG.error("Failed to notify callback about failure", t);
+               }
+       }
+
+       @Override
+       public void channelInactive(ChannelHandlerContext ctx) throws Exception 
{
+               // Only the client is expected to close the channel. Otherwise 
it
+               // indicates a failure. Note that this will be invoked in both 
cases
+               // though. If the callback closed the channel, the callback 
must be
+               // ignored.
+               try {
+                       callback.onFailure(new ClosedChannelException());
+               } catch (Throwable t) {
+                       LOG.error("Failed to notify callback about failure", t);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandlerCallback.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandlerCallback.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandlerCallback.java
new file mode 100644
index 0000000..00ce1ed
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandlerCallback.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.queryablestate.network.messages.MessageBody;
+
+/**
+ * Callback for {@link ClientHandler}.
+ */
+@Internal
+public interface ClientHandlerCallback<RESP extends MessageBody> {
+
+       /**
+        * Called on a successful request.
+        *
+        * @param requestId                     ID of the request
+        * @param response                      The received response
+        */
+       void onRequestResult(long requestId, RESP response);
+
+       /**
+        * Called on a failed request.
+        *
+        * @param requestId ID of the request
+        * @param cause     Cause of the request failure
+        */
+       void onRequestFailure(long requestId, Throwable cause);
+
+       /**
+        * Called on any failure, which is not related to a specific request.
+        *
+        * <p>This can be for example a caught Exception in the channel pipeline
+        * or an unexpected channel close.
+        *
+        * @param cause Cause of the failure
+        */
+       void onFailure(Throwable cause);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageBody.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageBody.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageBody.java
new file mode 100644
index 0000000..f26c267
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageBody.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network.messages;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * The base class for every message exchanged during the communication between
+ * {@link org.apache.flink.queryablestate.network.Client client} and
+ * {@link org.apache.flink.queryablestate.network.AbstractServerBase server}.
+ *
+ * <p>Every such message should also have a {@link MessageDeserializer}.
+ */
+@Internal
+public abstract class MessageBody {
+
+       /**
+        * Serializes the message into a byte array.
+        * @return A byte array with the serialized content of the message.
+        */
+       public abstract byte[] serialize();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageDeserializer.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageDeserializer.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageDeserializer.java
new file mode 100644
index 0000000..436fb82
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageDeserializer.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network.messages;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+/**
+ * A utility used to deserialize a {@link MessageBody message}.
+ * @param <M> The type of the message to be deserialized.
+ *           It has to extend {@link MessageBody}
+ */
+@Internal
+public interface MessageDeserializer<M extends MessageBody> {
+
+       /**
+        * Deserializes a message contained in a byte buffer.
+        * @param buf the buffer containing the message.
+        * @return The deserialized message.
+        */
+       M deserializeMessage(ByteBuf buf);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageSerializer.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageSerializer.java
index 32bca64..c0a0d32 100644
--- 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageSerializer.java
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageSerializer.java
@@ -18,11 +18,7 @@
 
 package org.apache.flink.queryablestate.network.messages;
 
-import org.apache.flink.queryablestate.messages.KvStateRequest;
-import org.apache.flink.queryablestate.messages.KvStateRequestFailure;
-import org.apache.flink.queryablestate.messages.KvStateRequestResult;
-import org.apache.flink.runtime.query.KvStateID;
-import org.apache.flink.util.AbstractID;
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
@@ -37,8 +33,8 @@ import java.io.ObjectOutputStream;
 
 /**
  * Serialization and deserialization of messages exchanged between
- * {@link org.apache.flink.queryablestate.client.KvStateClient client} and
- * {@link org.apache.flink.queryablestate.server.KvStateServerImpl server}.
+ * {@link org.apache.flink.queryablestate.network.Client client} and
+ * {@link org.apache.flink.queryablestate.network.AbstractServerBase server}.
  *
  * <p>The binary messages have the following format:
  *
@@ -52,8 +48,12 @@ import java.io.ObjectOutputStream;
  * </pre>
  *
  * <p>The concrete content of a message depends on the {@link MessageType}.
+ *
+ * @param <REQ>                Type of the requests of the protocol.
+ * @param <RESP>       Type of the responses of the protocol.
  */
-public final class MessageSerializer {
+@Internal
+public final class MessageSerializer<REQ extends MessageBody, RESP extends 
MessageBody> {
 
        /** The serialization version ID. */
        private static final int VERSION = 0x79a1b710;
@@ -64,78 +64,58 @@ public final class MessageSerializer {
        /** Byte length of the request id. */
        private static final int REQUEST_ID_SIZE = Long.BYTES;
 
+       /** The constructor of the {@link MessageBody client requests}. Used 
for deserialization. */
+       private final MessageDeserializer<REQ> requestDeserializer;
+
+       /** The constructor of the {@link MessageBody server responses}. Used 
for deserialization. */
+       private final MessageDeserializer<RESP> responseDeserializer;
+
+       public MessageSerializer(MessageDeserializer<REQ> requestDeser, 
MessageDeserializer<RESP> responseDeser) {
+               requestDeserializer = Preconditions.checkNotNull(requestDeser);
+               responseDeserializer = 
Preconditions.checkNotNull(responseDeser);
+       }
+
        // 
------------------------------------------------------------------------
        // Serialization
        // 
------------------------------------------------------------------------
 
        /**
-        * Allocates a buffer and serializes the KvState request into it.
+        * Serializes the request sent to the
+        * {@link org.apache.flink.queryablestate.network.AbstractServerBase}.
         *
-        * @param alloc                     ByteBuf allocator for the buffer to
-        *                                  serialize message into
-        * @param requestId                 ID for this request
-        * @param kvStateId                 ID of the requested KvState instance
-        * @param serializedKeyAndNamespace Serialized key and namespace to 
request
-        *                                  from the KvState instance.
-        * @return Serialized KvState request message
+        * @param alloc                 The {@link ByteBufAllocator} used to 
allocate the buffer to serialize the message into.
+        * @param requestId             The id of the request to which the 
message refers to.
+        * @param request               The request to be serialized.
+        * @return A {@link ByteBuf} containing the serialized message.
         */
-       public static ByteBuf serializeKvStateRequest(
-                       ByteBufAllocator alloc,
-                       long requestId,
-                       KvStateID kvStateId,
-                       byte[] serializedKeyAndNamespace) {
-
-               // Header + request ID + KvState ID + Serialized namespace
-               int frameLength = HEADER_LENGTH + REQUEST_ID_SIZE + 
AbstractID.SIZE + (Integer.BYTES + serializedKeyAndNamespace.length);
-               ByteBuf buf = alloc.ioBuffer(frameLength + 4); // +4 for frame 
length
-
-               buf.writeInt(frameLength);
-
-               writeHeader(buf, MessageType.REQUEST);
-
-               buf.writeLong(requestId);
-               buf.writeLong(kvStateId.getLowerPart());
-               buf.writeLong(kvStateId.getUpperPart());
-               buf.writeInt(serializedKeyAndNamespace.length);
-               buf.writeBytes(serializedKeyAndNamespace);
-
-               return buf;
+       public static <REQ extends MessageBody> ByteBuf serializeRequest(
+                       final ByteBufAllocator alloc,
+                       final long requestId,
+                       final REQ request) {
+               Preconditions.checkNotNull(request);
+               return writePayload(alloc, requestId, MessageType.REQUEST, 
request.serialize());
        }
 
        /**
-        * Allocates a buffer and serializes the KvState request result into it.
+        * Serializes the response sent to the
+        * {@link org.apache.flink.queryablestate.network.Client}.
         *
-        * @param alloc             ByteBuf allocator for the buffer to 
serialize message into
-        * @param requestId         ID for this request
-        * @param serializedResult  Serialized Result
-        * @return Serialized KvState request result message
+        * @param alloc                 The {@link ByteBufAllocator} used to 
allocate the buffer to serialize the message into.
+        * @param requestId             The id of the request to which the 
message refers to.
+        * @param response              The response to be serialized.
+        * @return A {@link ByteBuf} containing the serialized message.
         */
-       public static ByteBuf serializeKvStateRequestResult(
-                       ByteBufAllocator alloc,
-                       long requestId,
-                       byte[] serializedResult) {
-
-               Preconditions.checkNotNull(serializedResult, "Serialized 
result");
-
-               // Header + request ID + serialized result
-               int frameLength = HEADER_LENGTH + REQUEST_ID_SIZE + 4 + 
serializedResult.length;
-
-               // TODO: 10/5/17 there was a bug all this time?
-               ByteBuf buf = alloc.ioBuffer(frameLength + 4);
-
-               buf.writeInt(frameLength);
-               writeHeader(buf, MessageType.REQUEST_RESULT);
-               buf.writeLong(requestId);
-
-               buf.writeInt(serializedResult.length);
-               buf.writeBytes(serializedResult);
-
-               return buf;
+       public static <RESP extends MessageBody> ByteBuf serializeResponse(
+                       final ByteBufAllocator alloc,
+                       final long requestId,
+                       final RESP response) {
+               Preconditions.checkNotNull(response);
+               return writePayload(alloc, requestId, 
MessageType.REQUEST_RESULT, response.serialize());
        }
 
        /**
         * Serializes the exception containing the failure message sent to the
-        * {@link org.apache.flink.queryablestate.client.KvStateClient} in case 
of
+        * {@link org.apache.flink.queryablestate.network.Client} in case of
         * protocol related errors.
         *
         * @param alloc                 The {@link ByteBufAllocator} used to 
allocate the buffer to serialize the message into.
@@ -143,7 +123,7 @@ public final class MessageSerializer {
         * @param cause                 The exception thrown at the server.
         * @return A {@link ByteBuf} containing the serialized message.
         */
-       public static ByteBuf serializeKvStateRequestFailure(
+       public static ByteBuf serializeRequestFailure(
                        final ByteBufAllocator alloc,
                        final long requestId,
                        final Throwable cause) throws IOException {
@@ -168,7 +148,7 @@ public final class MessageSerializer {
 
        /**
         * Serializes the failure message sent to the
-        * {@link org.apache.flink.queryablestate.client.KvStateClient} in case 
of
+        * {@link org.apache.flink.queryablestate.network.Client} in case of
         * server related errors.
         *
         * @param alloc                 The {@link ByteBufAllocator} used to 
allocate the buffer to serialize the message into.
@@ -207,6 +187,31 @@ public final class MessageSerializer {
                buf.writeInt(messageType.ordinal());
        }
 
+       /**
+        * Helper for serializing the messages.
+        *
+        * @param alloc                 The {@link ByteBufAllocator} used to 
allocate the buffer to serialize the message into.
+        * @param requestId             The id of the request to which the 
message refers to.
+        * @param messageType   The {@link MessageType type of the message}.
+        * @param payload               The serialized version of the message.
+        * @return A {@link ByteBuf} containing the serialized message.
+        */
+       private static ByteBuf writePayload(
+                       final ByteBufAllocator alloc,
+                       final long requestId,
+                       final MessageType messageType,
+                       final byte[] payload) {
+
+               final int frameLength = HEADER_LENGTH + REQUEST_ID_SIZE + 
payload.length;
+               final ByteBuf buf = alloc.ioBuffer(frameLength + Integer.BYTES);
+
+               buf.writeInt(frameLength);
+               writeHeader(buf, messageType);
+               buf.writeLong(requestId);
+               buf.writeBytes(payload);
+               return buf;
+       }
+
        // 
------------------------------------------------------------------------
        // Deserialization
        // 
------------------------------------------------------------------------
@@ -230,71 +235,54 @@ public final class MessageSerializer {
                // fetching the message type
                int msgType = buf.readInt();
                MessageType[] values = MessageType.values();
-               Preconditions.checkState(msgType >= 0 && msgType <= 
values.length,
+               Preconditions.checkState(msgType >= 0 && msgType < 
values.length,
                                "Illegal message type with index " + msgType + 
'.');
                return values[msgType];
        }
 
        /**
-        * Deserializes the KvState request message.
-        *
-        * <p><strong>Important</strong>: the returned buffer is sliced from the
-        * incoming ByteBuf stream and retained. Therefore, it needs to be 
recycled
-        * by the consumer.
-        *
-        * @param buf Buffer to deserialize (expected to be positioned after 
header)
-        * @return Deserialized KvStateRequest
+        * De-serializes the header and returns the {@link MessageType}.
+        * <pre>
+        *  <b>The buffer is expected to be at the request id position.</b>
+        * </pre>
+        * @param buf   The {@link ByteBuf} containing the serialized request 
id.
+        * @return              The request id.
         */
-       public static KvStateRequest deserializeKvStateRequest(ByteBuf buf) {
-               long requestId = buf.readLong();
-               KvStateID kvStateId = new KvStateID(buf.readLong(), 
buf.readLong());
-
-               // Serialized key and namespace
-               int length = buf.readInt();
-
-               if (length < 0) {
-                       throw new IllegalArgumentException("Negative length for 
serialized key and namespace. " +
-                                       "This indicates a serialization 
error.");
-               }
-
-               // Copy the buffer in order to be able to safely recycle the 
ByteBuf
-               byte[] serializedKeyAndNamespace = new byte[length];
-               if (length > 0) {
-                       buf.readBytes(serializedKeyAndNamespace);
-               }
-
-               return new KvStateRequest(requestId, kvStateId, 
serializedKeyAndNamespace);
+       public static long getRequestId(final ByteBuf buf) {
+               return buf.readLong();
        }
 
        /**
-        * Deserializes the KvState request result.
-        *
-        * @param buf Buffer to deserialize (expected to be positioned after 
header)
-        * @return Deserialized KvStateRequestResult
+        * De-serializes the request sent to the
+        * {@link org.apache.flink.queryablestate.network.AbstractServerBase}.
+        * <pre>
+        *  <b>The buffer is expected to be at the request position.</b>
+        * </pre>
+        * @param buf   The {@link ByteBuf} containing the serialized request.
+        * @return              The request.
         */
-       public static KvStateRequestResult 
deserializeKvStateRequestResult(ByteBuf buf) {
-               long requestId = buf.readLong();
-
-               // Serialized KvState
-               int length = buf.readInt();
-
-               if (length < 0) {
-                       throw new IllegalArgumentException("Negative length for 
serialized result. " +
-                                       "This indicates a serialization 
error.");
-               }
-
-               byte[] serializedValue = new byte[length];
-
-               if (length > 0) {
-                       buf.readBytes(serializedValue);
-               }
+       public REQ deserializeRequest(final ByteBuf buf) {
+               Preconditions.checkNotNull(buf);
+               return requestDeserializer.deserializeMessage(buf);
+       }
 
-               return new KvStateRequestResult(requestId, serializedValue);
+       /**
+        * De-serializes the response sent to the
+        * {@link org.apache.flink.queryablestate.network.Client}.
+        * <pre>
+        *  <b>The buffer is expected to be at the response position.</b>
+        * </pre>
+        * @param buf   The {@link ByteBuf} containing the serialized response.
+        * @return              The response.
+        */
+       public RESP deserializeResponse(final ByteBuf buf) {
+               Preconditions.checkNotNull(buf);
+               return responseDeserializer.deserializeMessage(buf);
        }
 
        /**
-        * De-serializes the {@link KvStateRequestFailure} sent to the
-        * {@link org.apache.flink.queryablestate.client.KvStateClient} in case 
of
+        * De-serializes the {@link RequestFailure} sent to the
+        * {@link org.apache.flink.queryablestate.network.Client} in case of
         * protocol related errors.
         * <pre>
         *  <b>The buffer is expected to be at the correct position.</b>
@@ -302,7 +290,7 @@ public final class MessageSerializer {
         * @param buf   The {@link ByteBuf} containing the serialized failure 
message.
         * @return              The failure message.
         */
-       public static KvStateRequestFailure 
deserializeKvStateRequestFailure(final ByteBuf buf) throws IOException, 
ClassNotFoundException {
+       public static RequestFailure deserializeRequestFailure(final ByteBuf 
buf) throws IOException, ClassNotFoundException {
                long requestId = buf.readLong();
 
                Throwable cause;
@@ -310,12 +298,12 @@ public final class MessageSerializer {
                                ObjectInputStream in = new 
ObjectInputStream(bis)) {
                        cause = (Throwable) in.readObject();
                }
-               return new KvStateRequestFailure(requestId, cause);
+               return new RequestFailure(requestId, cause);
        }
 
        /**
         * De-serializes the failure message sent to the
-        * {@link org.apache.flink.queryablestate.client.KvStateClient} in case 
of
+        * {@link org.apache.flink.queryablestate.network.Client} in case of
         * server related errors.
         * <pre>
         *  <b>The buffer is expected to be at the correct position.</b>

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageType.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageType.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageType.java
index 4e4435d..562ce93 100644
--- 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageType.java
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageType.java
@@ -18,11 +18,14 @@
 
 package org.apache.flink.queryablestate.network.messages;
 
+import org.apache.flink.annotation.Internal;
+
 /**
  * Expected message types during the communication between
- * {@link org.apache.flink.queryablestate.client.KvStateClient state client} 
and
- * {@link org.apache.flink.queryablestate.server.KvStateServerImpl state 
server}.
+ * {@link org.apache.flink.queryablestate.network.Client client} and
+ * {@link org.apache.flink.queryablestate.network.AbstractServerBase server}.
  */
+@Internal
 public enum MessageType {
 
        /** The message is a request. */

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/RequestFailure.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/RequestFailure.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/RequestFailure.java
new file mode 100644
index 0000000..106199f
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/RequestFailure.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network.messages;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * A message indicating a protocol-related error.
+ */
+@Internal
+public class RequestFailure {
+
+       /** ID of the request responding to. */
+       private final long requestId;
+
+       /** Failure cause. Not allowed to be a user type. */
+       private final Throwable cause;
+
+       /**
+        * Creates a failure response to a {@link MessageBody}.
+        *
+        * @param requestId ID for the request responding to
+        * @param cause     Failure cause (not allowed to be a user type)
+        */
+       public RequestFailure(long requestId, Throwable cause) {
+               this.requestId = requestId;
+               this.cause = cause;
+       }
+
+       /**
+        * Returns the request ID responding to.
+        *
+        * @return Request ID responding to
+        */
+       public long getRequestId() {
+               return requestId;
+       }
+
+       /**
+        * Returns the failure cause.
+        *
+        * @return Failure cause
+        */
+       public Throwable getCause() {
+               return cause;
+       }
+
+       @Override
+       public String toString() {
+               return "RequestFailure{" +
+                               "requestId=" + requestId +
+                               ", cause=" + cause +
+                               '}';
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/ChunkedByteBuf.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/ChunkedByteBuf.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/ChunkedByteBuf.java
deleted file mode 100644
index f10969e..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/ChunkedByteBuf.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.server;
-
-import org.apache.flink.util.Preconditions;
-
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
-import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedInput;
-import 
org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
-
-/**
- * A {@link ByteBuf} instance to be consumed in chunks by {@link 
ChunkedWriteHandler},
- * respecting the high and low watermarks.
- *
- * @see <a 
href="http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#10.0";>Low/High
 Watermarks</a>
- */
-public class ChunkedByteBuf implements ChunkedInput<ByteBuf> {
-
-       /** The buffer to chunk. */
-       private final ByteBuf buf;
-
-       /** Size of chunks. */
-       private final int chunkSize;
-
-       /** Closed flag. */
-       private boolean isClosed;
-
-       /** End of input flag. */
-       private boolean isEndOfInput;
-
-       public ChunkedByteBuf(ByteBuf buf, int chunkSize) {
-               this.buf = Preconditions.checkNotNull(buf, "Buffer");
-               Preconditions.checkArgument(chunkSize > 0, "Non-positive chunk 
size");
-               this.chunkSize = chunkSize;
-       }
-
-       @Override
-       public boolean isEndOfInput() throws Exception {
-               return isClosed || isEndOfInput;
-       }
-
-       @Override
-       public void close() throws Exception {
-               if (!isClosed) {
-                       // If we did not consume the whole buffer yet, we have 
to release
-                       // it here. Otherwise, it's the responsibility of the 
consumer.
-                       if (!isEndOfInput) {
-                               buf.release();
-                       }
-
-                       isClosed = true;
-               }
-       }
-
-       @Override
-       public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
-               if (isClosed) {
-                       return null;
-               } else if (buf.readableBytes() <= chunkSize) {
-                       isEndOfInput = true;
-
-                       // Don't retain as the consumer is responsible to 
release it
-                       return buf.slice();
-               } else {
-                       // Return a chunk sized slice of the buffer. The ref 
count is
-                       // shared with the original buffer. That's why we need 
to retain
-                       // a reference here.
-                       return buf.readSlice(chunkSize).retain();
-               }
-       }
-
-       @Override
-       public String toString() {
-               return "ChunkedByteBuf{" +
-                               "buf=" + buf +
-                               ", chunkSize=" + chunkSize +
-                               ", isClosed=" + isClosed +
-                               ", isEndOfInput=" + isEndOfInput +
-                               '}';
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
index 9a31fca..055a5d0 100644
--- 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
@@ -18,31 +18,25 @@
 
 package org.apache.flink.queryablestate.server;
 
-import org.apache.flink.queryablestate.UnknownKeyOrNamespace;
-import org.apache.flink.queryablestate.UnknownKvStateID;
-import org.apache.flink.queryablestate.messages.KvStateRequest;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.queryablestate.UnknownKeyOrNamespaceException;
+import org.apache.flink.queryablestate.UnknownKvStateIdException;
+import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
+import org.apache.flink.queryablestate.messages.KvStateResponse;
+import org.apache.flink.queryablestate.network.AbstractServerHandler;
 import org.apache.flink.queryablestate.network.messages.MessageSerializer;
-import org.apache.flink.queryablestate.network.messages.MessageType;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.netty.KvStateRequestStats;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
 
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
-import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
-import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.Objects;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * This handler dispatches asynchronous tasks, which query {@link 
InternalKvState}
@@ -52,257 +46,62 @@ import java.util.concurrent.TimeUnit;
  * query task. The actual query is handled in a separate thread as it might
  * otherwise block the network threads (file I/O etc.).
  */
+@Internal
 @ChannelHandler.Sharable
-public class KvStateServerHandler extends ChannelInboundHandlerAdapter {
+public class KvStateServerHandler extends 
AbstractServerHandler<KvStateInternalRequest, KvStateResponse> {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(KvStateServerHandler.class);
 
        /** KvState registry holding references to the KvState instances. */
        private final KvStateRegistry registry;
 
-       /** Thread pool for query execution. */
-       private final ExecutorService queryExecutor;
-
-       /** Exposed server statistics. */
-       private final KvStateRequestStats stats;
-
        /**
-        * Create the handler.
+        * Create the handler used by the {@link KvStateServerImpl}.
         *
-        * @param kvStateRegistry Registry to query.
-        * @param queryExecutor   Thread pool for query execution.
-        * @param stats           Exposed server statistics.
+        * @param server the {@link KvStateServerImpl} using the handler.
+        * @param kvStateRegistry registry to query.
+        * @param serializer the {@link MessageSerializer} used to (de-) 
serialize the different messages.
+        * @param stats server statistics collector.
         */
        public KvStateServerHandler(
-                       KvStateRegistry kvStateRegistry,
-                       ExecutorService queryExecutor,
-                       KvStateRequestStats stats) {
+                       final KvStateServerImpl server,
+                       final KvStateRegistry kvStateRegistry,
+                       final MessageSerializer<KvStateInternalRequest, 
KvStateResponse> serializer,
+                       final KvStateRequestStats stats) {
 
-               this.registry = Objects.requireNonNull(kvStateRegistry, 
"KvStateRegistry");
-               this.queryExecutor = Objects.requireNonNull(queryExecutor, 
"Query thread pool");
-               this.stats = Objects.requireNonNull(stats, 
"KvStateRequestStats");
+               super(server, serializer, stats);
+               this.registry = Preconditions.checkNotNull(kvStateRegistry);
        }
 
        @Override
-       public void channelActive(ChannelHandlerContext ctx) throws Exception {
-               stats.reportActiveConnection();
-       }
-
-       @Override
-       public void channelInactive(ChannelHandlerContext ctx) throws Exception 
{
-               stats.reportInactiveConnection();
-       }
-
-       @Override
-       public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
-               KvStateRequest request = null;
+       public CompletableFuture<KvStateResponse> handleRequest(final long 
requestId, final KvStateInternalRequest request) {
+               final CompletableFuture<KvStateResponse> responseFuture = new 
CompletableFuture<>();
 
                try {
-                       ByteBuf buf = (ByteBuf) msg;
-                       MessageType msgType = 
MessageSerializer.deserializeHeader(buf);
-
-                       if (msgType == MessageType.REQUEST) {
-                               // 
------------------------------------------------------------
-                               // Request
-                               // 
------------------------------------------------------------
-                               request = 
MessageSerializer.deserializeKvStateRequest(buf);
-
-                               stats.reportRequest();
-
-                               InternalKvState<?> kvState = 
registry.getKvState(request.getKvStateId());
-
-                               if (kvState != null) {
-                                       // Execute actual query async, because 
it is possibly
-                                       // blocking (e.g. file I/O).
-                                       //
-                                       // A submission failure is not treated 
as fatal.
-                                       queryExecutor.submit(new 
AsyncKvStateQueryTask(ctx, request, kvState, stats));
-                               } else {
-                                       ByteBuf unknown = 
MessageSerializer.serializeKvStateRequestFailure(
-                                                       ctx.alloc(),
-                                                       request.getRequestId(),
-                                                       new 
UnknownKvStateID(request.getKvStateId()));
-
-                                       ctx.writeAndFlush(unknown);
-
-                                       stats.reportFailedRequest();
-                               }
+                       final InternalKvState<?> kvState = 
registry.getKvState(request.getKvStateId());
+                       if (kvState == null) {
+                               responseFuture.completeExceptionally(new 
UnknownKvStateIdException(getServerName(), request.getKvStateId()));
                        } else {
-                               // 
------------------------------------------------------------
-                               // Unexpected
-                               // 
------------------------------------------------------------
-                               ByteBuf failure = 
MessageSerializer.serializeServerFailure(
-                                               ctx.alloc(),
-                                               new 
IllegalArgumentException("Unexpected message type " + msgType
-                                                               + ". 
KvStateServerHandler expects "
-                                                               + 
MessageType.REQUEST + " messages."));
-
-                               ctx.writeAndFlush(failure);
-                       }
-               } catch (Throwable t) {
-                       String stringifiedCause = 
ExceptionUtils.stringifyException(t);
-
-                       ByteBuf err;
-                       if (request != null) {
-                               String errMsg = "Failed to handle incoming 
request with ID " +
-                                               request.getRequestId() + ". 
Caused by: " + stringifiedCause;
-                               err = 
MessageSerializer.serializeKvStateRequestFailure(
-                                               ctx.alloc(),
-                                               request.getRequestId(),
-                                               new RuntimeException(errMsg));
-
-                               stats.reportFailedRequest();
-                       } else {
-                               String errMsg = "Failed to handle incoming 
message. Caused by: " + stringifiedCause;
-                               err = MessageSerializer.serializeServerFailure(
-                                               ctx.alloc(),
-                                               new RuntimeException(errMsg));
-                       }
-
-                       ctx.writeAndFlush(err);
-               } finally {
-                       // IMPORTANT: We have to always recycle the incoming 
buffer.
-                       // Otherwise we will leak memory out of Netty's buffer 
pool.
-                       //
-                       // If any operation ever holds on to the buffer, it is 
the
-                       // responsibility of that operation to retain the 
buffer and
-                       // release it later.
-                       ReferenceCountUtil.release(msg);
-               }
-       }
-
-       @Override
-       public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
throws Exception {
-               String stringifiedCause = 
ExceptionUtils.stringifyException(cause);
-               String msg = "Exception in server pipeline. Caused by: " + 
stringifiedCause;
-
-               ByteBuf err = MessageSerializer.serializeServerFailure(
-                               ctx.alloc(),
-                               new RuntimeException(msg));
-
-               ctx.writeAndFlush(err).addListener(ChannelFutureListener.CLOSE);
-       }
-
-       /**
-        * Task to execute the actual query against the {@link InternalKvState} 
instance.
-        */
-       private static class AsyncKvStateQueryTask implements Runnable {
-
-               private final ChannelHandlerContext ctx;
-
-               private final KvStateRequest request;
-
-               private final InternalKvState<?> kvState;
-
-               private final KvStateRequestStats stats;
-
-               private final long creationNanos;
-
-               public AsyncKvStateQueryTask(
-                               ChannelHandlerContext ctx,
-                               KvStateRequest request,
-                               InternalKvState<?> kvState,
-                               KvStateRequestStats stats) {
-
-                       this.ctx = Objects.requireNonNull(ctx, "Channel handler 
context");
-                       this.request = Objects.requireNonNull(request, "State 
query");
-                       this.kvState = Objects.requireNonNull(kvState, 
"KvState");
-                       this.stats = Objects.requireNonNull(stats, "State query 
stats");
-                       this.creationNanos = System.nanoTime();
-               }
-
-               @Override
-               public void run() {
-                       boolean success = false;
-
-                       try {
-                               if (!ctx.channel().isActive()) {
-                                       return;
-                               }
-
-                               // Query the KvState instance
                                byte[] serializedKeyAndNamespace = 
request.getSerializedKeyAndNamespace();
-                               byte[] serializedResult = 
kvState.getSerializedValue(serializedKeyAndNamespace);
 
+                               byte[] serializedResult = 
kvState.getSerializedValue(serializedKeyAndNamespace);
                                if (serializedResult != null) {
-                                       // We found some data, success!
-                                       ByteBuf buf = 
MessageSerializer.serializeKvStateRequestResult(
-                                                       ctx.alloc(),
-                                                       request.getRequestId(),
-                                                       serializedResult);
-
-                                       int highWatermark = 
ctx.channel().config().getWriteBufferHighWaterMark();
-
-                                       ChannelFuture write;
-                                       if (buf.readableBytes() <= 
highWatermark) {
-                                               write = ctx.writeAndFlush(buf);
-                                       } else {
-                                               write = ctx.writeAndFlush(new 
ChunkedByteBuf(buf, highWatermark));
-                                       }
-
-                                       write.addListener(new 
QueryResultWriteListener());
-
-                                       success = true;
+                                       responseFuture.complete(new 
KvStateResponse(serializedResult));
                                } else {
-                                       // No data for the key/namespace. This 
is considered to be
-                                       // a failure.
-                                       ByteBuf unknownKey = 
MessageSerializer.serializeKvStateRequestFailure(
-                                                       ctx.alloc(),
-                                                       request.getRequestId(),
-                                                       new 
UnknownKeyOrNamespace());
-
-                                       ctx.writeAndFlush(unknownKey);
-                               }
-                       } catch (Throwable t) {
-                               try {
-                                       String stringifiedCause = 
ExceptionUtils.stringifyException(t);
-                                       String errMsg = "Failed to query state 
backend for query " +
-                                                       request.getRequestId() 
+ ". Caused by: " + stringifiedCause;
-
-                                       ByteBuf err = 
MessageSerializer.serializeKvStateRequestFailure(
-                                                       ctx.alloc(), 
request.getRequestId(), new RuntimeException(errMsg));
-
-                                       ctx.writeAndFlush(err);
-                               } catch (IOException e) {
-                                       LOG.error("Failed to respond with the 
error after failed to query state backend", e);
-                               }
-                       } finally {
-                               if (!success) {
-                                       stats.reportFailedRequest();
+                                       
responseFuture.completeExceptionally(new 
UnknownKeyOrNamespaceException(getServerName()));
                                }
                        }
+                       return responseFuture;
+               } catch (Throwable t) {
+                       String errMsg = "Error while processing request with ID 
" + requestId +
+                                       ". Caused by: " + 
ExceptionUtils.stringifyException(t);
+                       responseFuture.completeExceptionally(new 
RuntimeException(errMsg));
+                       return responseFuture;
                }
+       }
 
-               @Override
-               public String toString() {
-                       return "AsyncKvStateQueryTask{" +
-                                       ", request=" + request +
-                                       ", creationNanos=" + creationNanos +
-                                       '}';
-               }
-
-               /**
-                * Callback after query result has been written.
-                *
-                * <p>Gathers stats and logs errors.
-                */
-               private class QueryResultWriteListener implements 
ChannelFutureListener {
-
-                       @Override
-                       public void operationComplete(ChannelFuture future) 
throws Exception {
-                               long durationNanos = System.nanoTime() - 
creationNanos;
-                               long durationMillis = 
TimeUnit.MILLISECONDS.convert(durationNanos, TimeUnit.NANOSECONDS);
-
-                               if (future.isSuccess()) {
-                                       
stats.reportSuccessfulRequest(durationMillis);
-                               } else {
-                                       if (LOG.isDebugEnabled()) {
-                                               LOG.debug("Query " + request + 
" failed after " + durationMillis + " ms", future.cause());
-                                       }
-
-                                       stats.reportFailedRequest();
-                               }
-                       }
-               }
+       @Override
+       public void shutdown() {
+               // do nothing
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
index 4bf7e24..b4c548a 100644
--- 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
@@ -18,213 +18,93 @@
 
 package org.apache.flink.queryablestate.server;
 
-import org.apache.flink.queryablestate.messages.KvStateRequest;
-import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
+import org.apache.flink.queryablestate.messages.KvStateResponse;
+import org.apache.flink.queryablestate.network.AbstractServerBase;
+import org.apache.flink.queryablestate.network.AbstractServerHandler;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.KvStateServer;
 import org.apache.flink.runtime.query.KvStateServerAddress;
 import org.apache.flink.runtime.query.netty.KvStateRequestStats;
 import org.apache.flink.util.Preconditions;
 
-import 
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
-import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption;
-import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
-import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
-import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
-import 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
-import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-import 
org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
 
 /**
- * Netty-based server answering {@link KvStateRequest} messages.
- *
- * <p>Requests are handled by asynchronous query tasks (see {@link 
KvStateServerHandler.AsyncKvStateQueryTask})
- * that are executed by a separate query Thread pool. This pool is shared among
- * all TCP connections.
- *
- * <p>The incoming pipeline looks as follows:
- * <pre>
- * Socket.read() -> LengthFieldBasedFrameDecoder -> KvStateServerHandler
- * </pre>
- *
- * <p>Received binary messages are expected to contain a frame length field. 
Netty's
- * {@link LengthFieldBasedFrameDecoder} is used to fully receive the frame 
before
- * giving it to our {@link KvStateServerHandler}.
- *
- * <p>Connections are established and closed by the client. The server only
- * closes the connection on a fatal failure that cannot be recovered. A
- * server-side connection close is considered a failure by the client.
+ * The default implementation of the {@link KvStateServer}.
  */
-public class KvStateServerImpl implements KvStateServer {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(KvStateServer.class);
+@Internal
+public class KvStateServerImpl extends 
AbstractServerBase<KvStateInternalRequest, KvStateResponse> implements 
KvStateServer {
 
-       /** Server config: low water mark. */
-       private static final int LOW_WATER_MARK = 8 * 1024;
+       private static final Logger LOG = 
LoggerFactory.getLogger(KvStateServerImpl.class);
 
-       /** Server config: high water mark. */
-       private static final int HIGH_WATER_MARK = 32 * 1024;
+       /** The {@link KvStateRegistry} to query for state instances. */
+       private final KvStateRegistry kvStateRegistry;
 
-       /** Netty's ServerBootstrap. */
-       private final ServerBootstrap bootstrap;
+       private final KvStateRequestStats stats;
 
-       /** Query executor thread pool. */
-       private final ExecutorService queryExecutor;
-
-       /** Address of this server. */
-       private KvStateServerAddress serverAddress;
+       private MessageSerializer<KvStateInternalRequest, KvStateResponse> 
serializer;
 
        /**
-        * Creates the {@link KvStateServer}.
+        * Creates the state server.
+        *
+        * <p>The server is instantiated using reflection by the
+        * {@link 
org.apache.flink.runtime.query.QueryableStateUtils#createKvStateServer(InetAddress,
 int, int, int, KvStateRegistry, KvStateRequestStats)
+        * QueryableStateUtils.startKvStateServer(InetAddress, int, int, int, 
KvStateRegistry, KvStateRequestStats)}.
         *
         * <p>The server needs to be started via {@link #start()} in order to 
bind
         * to the configured bind address.
         *
-        * @param bindAddress         Address to bind to
-        * @param bindPort            Port to bind to. Pick random port if 0.
-        * @param numEventLoopThreads Number of event loop threads
-        * @param numQueryThreads     Number of query threads
-        * @param kvStateRegistry     KvStateRegistry to query for KvState 
instances
-        * @param stats               Statistics tracker
+        * @param bindAddress the address to listen to.
+        * @param bindPort the port to listen to.
+        * @param numEventLoopThreads number of event loop threads.
+        * @param numQueryThreads number of query threads.
+        * @param kvStateRegistry {@link KvStateRegistry} to query for state 
instances.
+        * @param stats the statistics collector.
         */
        public KvStateServerImpl(
-                       InetAddress bindAddress,
-                       Integer bindPort,
-                       Integer numEventLoopThreads,
-                       Integer numQueryThreads,
-                       KvStateRegistry kvStateRegistry,
-                       KvStateRequestStats stats) {
-
-               Preconditions.checkArgument(bindPort >= 0 && bindPort <= 65536, 
"Port " + bindPort +
-                               " is out of valid port range (0-65536).");
-
-               Preconditions.checkArgument(numEventLoopThreads >= 1, 
"Non-positive number of event loop threads.");
-               Preconditions.checkArgument(numQueryThreads >= 1, "Non-positive 
number of query threads.");
-
-               Preconditions.checkNotNull(kvStateRegistry, "KvStateRegistry");
-               Preconditions.checkNotNull(stats, "KvStateRequestStats");
-
-               NettyBufferPool bufferPool = new 
NettyBufferPool(numEventLoopThreads);
-
-               ThreadFactory threadFactory = new ThreadFactoryBuilder()
-                               .setDaemon(true)
-                               .setNameFormat("Flink KvStateServer EventLoop 
Thread %d")
-                               .build();
-
-               NioEventLoopGroup nioGroup = new 
NioEventLoopGroup(numEventLoopThreads, threadFactory);
-
-               queryExecutor = createQueryExecutor(numQueryThreads);
-
-               // Shared between all channels
-               KvStateServerHandler serverHandler = new KvStateServerHandler(
-                               kvStateRegistry,
-                               queryExecutor,
-                               stats);
-
-               bootstrap = new ServerBootstrap()
-                               // Bind address and port
-                               .localAddress(bindAddress, bindPort)
-                               // NIO server channels
-                               .group(nioGroup)
-                               .channel(NioServerSocketChannel.class)
-                               // Server channel Options
-                               .option(ChannelOption.ALLOCATOR, bufferPool)
-                               // Child channel options
-                               .childOption(ChannelOption.ALLOCATOR, 
bufferPool)
-                               
.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, HIGH_WATER_MARK)
-                               
.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, LOW_WATER_MARK)
-                               // See initializer for pipeline details
-                               .childHandler(new 
KvStateServerChannelInitializer(serverHandler));
+                       final InetAddress bindAddress,
+                       final Integer bindPort,
+                       final Integer numEventLoopThreads,
+                       final Integer numQueryThreads,
+                       final KvStateRegistry kvStateRegistry,
+                       final KvStateRequestStats stats) {
+
+               super("Queryable State Server", bindAddress, bindPort, 
numEventLoopThreads, numQueryThreads);
+               this.stats = Preconditions.checkNotNull(stats);
+               this.kvStateRegistry = 
Preconditions.checkNotNull(kvStateRegistry);
        }
 
        @Override
-       public void start() throws InterruptedException {
-               Channel channel = bootstrap.bind().sync().channel();
-
-               InetSocketAddress localAddress = (InetSocketAddress) 
channel.localAddress();
-               serverAddress = new 
KvStateServerAddress(localAddress.getAddress(), localAddress.getPort());
+       public AbstractServerHandler<KvStateInternalRequest, KvStateResponse> 
initializeHandler() {
+               this.serializer = new MessageSerializer<>(
+                               new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(),
+                               new 
KvStateResponse.KvStateResponseDeserializer());
+               return new KvStateServerHandler(this, kvStateRegistry, 
serializer, stats);
        }
 
-       @Override
-       public KvStateServerAddress getAddress() {
-               if (serverAddress == null) {
-                       throw new IllegalStateException("KvStateServer not 
started yet.");
-               }
-
-               return serverAddress;
+       public MessageSerializer<KvStateInternalRequest, KvStateResponse> 
getSerializer() {
+               Preconditions.checkState(serializer != null, "Server " + 
getServerName() + " has not been started.");
+               return serializer;
        }
 
        @Override
-       public void shutDown() {
-               if (bootstrap != null) {
-                       EventLoopGroup group = bootstrap.group();
-                       if (group != null) {
-                               group.shutdownGracefully(0, 10, 
TimeUnit.SECONDS);
-                       }
-               }
-
-               if (queryExecutor != null) {
-                       queryExecutor.shutdown();
-               }
-
-               serverAddress = null;
+       public void start() throws InterruptedException {
+               super.start();
        }
 
-       /**
-        * Creates a thread pool for the query execution.
-        *
-        * @param numQueryThreads Number of query threads.
-        * @return Thread pool for query execution
-        */
-       private static ExecutorService createQueryExecutor(int numQueryThreads) 
{
-               ThreadFactory threadFactory = new ThreadFactoryBuilder()
-                               .setDaemon(true)
-                               .setNameFormat("Flink KvStateServer Query 
Thread %d")
-                               .build();
-
-               return Executors.newFixedThreadPool(numQueryThreads, 
threadFactory);
+       @Override
+       public KvStateServerAddress getServerAddress() {
+               return super.getServerAddress();
        }
 
-       /**
-        * Channel pipeline initializer.
-        *
-        * <p>The request handler is shared, whereas the other handlers are 
created
-        * per channel.
-        */
-       private static final class KvStateServerChannelInitializer extends 
ChannelInitializer<SocketChannel> {
-
-               /** The shared request handler. */
-               private final KvStateServerHandler sharedRequestHandler;
-
-               /**
-                * Creates the channel pipeline initializer with the shared 
request handler.
-                *
-                * @param sharedRequestHandler Shared request handler.
-                */
-               public KvStateServerChannelInitializer(KvStateServerHandler 
sharedRequestHandler) {
-                       this.sharedRequestHandler = 
Preconditions.checkNotNull(sharedRequestHandler, "Request handler");
-               }
-
-               @Override
-               protected void initChannel(SocketChannel ch) throws Exception {
-                       ch.pipeline()
-                                       .addLast(new ChunkedWriteHandler())
-                                       .addLast(new 
LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))
-                                       .addLast(sharedRequestHandler);
-               }
+       @Override
+       public void shutdown() {
+               super.shutdown();
        }
-
 }

Reply via email to