http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
index 27257d7..005c874 100644
--- 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
@@ -25,306 +25,117 @@ import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.ListSerializer;
-import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.configuration.QueryableStateOptions;
-import org.apache.flink.queryablestate.UnknownKeyOrNamespace;
-import org.apache.flink.queryablestate.UnknownKvStateID;
-import org.apache.flink.queryablestate.UnknownKvStateKeyGroupLocation;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.query.KvStateID;
-import org.apache.flink.runtime.query.KvStateLocation;
-import org.apache.flink.runtime.query.KvStateServer;
+import org.apache.flink.queryablestate.messages.KvStateRequest;
+import org.apache.flink.queryablestate.messages.KvStateResponse;
+import org.apache.flink.queryablestate.network.Client;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.query.KvStateServerAddress;
-import org.apache.flink.runtime.query.UnknownKvStateLocation;
 import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
 import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
-import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceTypeInfo;
+import org.apache.flink.runtime.util.Hardware;
+import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 
-import akka.actor.ActorSystem;
-import akka.dispatch.Futures;
-import akka.dispatch.Mapper;
-import akka.dispatch.Recover;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.net.ConnectException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import scala.Option;
-import scala.Some;
-import scala.Tuple2;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Duration;
-import scala.concurrent.duration.FiniteDuration;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.concurrent.CompletableFuture;
 
 /**
- * Client for queryable state.
+ * Client for querying Flink's managed state.
  *
  * <p>You can mark state as queryable via {@link 
StateDescriptor#setQueryable(String)}.
- * The state instance created from this descriptor will be published for 
queries
- * when it's created on the TaskManagers and the location will be reported to
- * the JobManager.
+ * The state instance created from this descriptor will be published for 
queries when it's
+ * created on the Task Managers and the location will be reported to the Job 
Manager.
  *
- * <p>The client resolves the location of the requested KvState via the
- * JobManager. Resolved locations are cached. When the server address of the
- * requested KvState instance is determined, the client sends out a request to
- * the server.
+ * <p>The client connects to a {@link 
org.apache.flink.runtime.query.KvStateClientProxy Client Proxy}
+ * running on a given Task Manager. The proxy is the entry point of the client 
to the Flink cluster.
+ * It forwards the requests of the client to the Job Manager and the required 
Task Manager, and forwards
+ * the final response back the client.
+ *
+ * <p>The proxy, initially resolves the location of the requested KvState via 
the JobManager. Resolved
+ * locations are cached. When the server address of the requested KvState 
instance is determined, the
+ * client sends out a request to the server. The returned final answer is then 
forwarded to the Client.
  */
+@PublicEvolving
 public class QueryableStateClient {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(QueryableStateClient.class);
 
-       /**
-        * {@link KvStateLocation} lookup to resolve the address of KvState 
instances.
-        */
-       private final KvStateLocationLookupService lookupService;
-
-       /**
-        * Network client for queries against {@link KvStateServer} instances.
-        */
-       private final KvStateClient kvStateClient;
-
-       /**
-        * Execution context.
-        */
-       private final ExecutionContext executionContext;
+       /** The client that forwards the requests to the proxy. */
+       private final Client<KvStateRequest, KvStateResponse> client;
 
-       /**
-        * Cache for {@link KvStateLocation} instances keyed by job and name.
-        */
-       private final ConcurrentMap<Tuple2<JobID, String>, 
Future<KvStateLocation>> lookupCache =
-                       new ConcurrentHashMap<>();
-
-       /** This is != null, if we started the actor system. */
-       private final ActorSystem actorSystem;
+       /** The address of the proxy this client is connected to. */
+       private final KvStateServerAddress remoteAddress;
 
+       /** The execution configuration used to instantiate the different 
(de-)serializers. */
        private ExecutionConfig executionConfig;
 
        /**
-        * Creates a client from the given configuration.
-        *
-        * <p>This will create multiple Thread pools: one for the started actor
-        * system and another for the network client.
-        *
-        * @param config Configuration to use.
-        * @throws Exception Failures are forwarded
+        * Create the Queryable State Client.
+        * @param remoteHostname the hostname of the {@link 
org.apache.flink.runtime.query.KvStateClientProxy proxy}
+        *                       to connect to.
+        * @param remotePort the port of the proxy to connect to.
         */
-       public QueryableStateClient(Configuration config) throws Exception {
-               this(config, 
HighAvailabilityServicesUtils.createHighAvailabilityServices(
-                               config, Executors.directExecutor(), 
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION));
+       public QueryableStateClient(final String remoteHostname, final int 
remotePort) throws UnknownHostException {
+               
this(InetAddress.getByName(Preconditions.checkNotNull(remoteHostname)), 
remotePort);
        }
 
        /**
-        * Creates a client from the given configuration.
-        *
-        * <p>This will create multiple Thread pools: one for the started actor
-        * system and another for the network client.
-        *
-        * @param config Configuration to use.
-        * @param highAvailabilityServices Service factory for high 
availability services
-        * @throws Exception Failures are forwarded
-        *
-        * @deprecated This constructor is deprecated and stays only for 
backwards compatibility. Use the
-        * {@link #QueryableStateClient(Configuration)} instead.
+        * Create the Queryable State Client.
+        * @param remoteAddress the {@link InetAddress address} of the
+        *                      {@link 
org.apache.flink.runtime.query.KvStateClientProxy proxy} to connect to.
+        * @param remotePort the port of the proxy to connect to.
         */
-       @Deprecated
-       public QueryableStateClient(
-                       Configuration config,
-                       HighAvailabilityServices highAvailabilityServices) 
throws Exception {
-               Preconditions.checkNotNull(config, "Configuration");
-
-               // Create a leader retrieval service
-               LeaderRetrievalService leaderRetrievalService = 
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
-
-               // Get the ask timeout
-               String askTimeoutString = 
config.getString(AkkaOptions.ASK_TIMEOUT);
-
-               Duration timeout = FiniteDuration.apply(askTimeoutString);
-               if (!timeout.isFinite()) {
-                       throw new 
IllegalConfigurationException(AkkaOptions.ASK_TIMEOUT.key()
-                                       + " is not a finite timeout ('" + 
askTimeoutString + "')");
-               }
-
-               FiniteDuration askTimeout = (FiniteDuration) timeout;
-
-               int lookupRetries = 
config.getInteger(QueryableStateOptions.CLIENT_LOOKUP_RETRIES);
-               int lookupRetryDelayMillis = 
config.getInteger(QueryableStateOptions.CLIENT_LOOKUP_RETRY_DELAY);
-
-               // Retries if no JobManager is around
-               AkkaKvStateLocationLookupService.LookupRetryStrategyFactory 
retryStrategy =
-                               new 
AkkaKvStateLocationLookupService.FixedDelayLookupRetryStrategyFactory(
-                                               lookupRetries,
-                                               
FiniteDuration.apply(lookupRetryDelayMillis, "ms"));
+       public QueryableStateClient(final InetAddress remoteAddress, final int 
remotePort) {
+               Preconditions.checkArgument(remotePort >= 0 && remotePort <= 
65536,
+                               "Remote Port " + remotePort + " is out of valid 
port range (0-65536).");
 
-               // Create the actor system
-               @SuppressWarnings("unchecked")
-               Option<Tuple2<String, Object>> remoting = new Some(new 
Tuple2<>("", 0));
-               this.actorSystem = AkkaUtils.createActorSystem(config, 
remoting);
+               this.remoteAddress = new KvStateServerAddress(remoteAddress, 
remotePort);
 
-               AkkaKvStateLocationLookupService lookupService = new 
AkkaKvStateLocationLookupService(
-                               leaderRetrievalService,
-                               actorSystem,
-                               askTimeout,
-                               retryStrategy);
+               final MessageSerializer<KvStateRequest, KvStateResponse> 
messageSerializer =
+                               new MessageSerializer<>(
+                                               new 
KvStateRequest.KvStateRequestDeserializer(),
+                                               new 
KvStateResponse.KvStateResponseDeserializer());
 
-               int numEventLoopThreads = 
config.getInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS);
-
-               if (numEventLoopThreads == 0) {
-                       numEventLoopThreads = 
Runtime.getRuntime().availableProcessors();
-               }
-
-               // Create the network client
-               KvStateClient networkClient = new KvStateClient(
-                               numEventLoopThreads,
+               this.client = new Client<>(
+                               "Queryable State Client",
+                               Hardware.getNumberCPUCores(),
+                               messageSerializer,
                                new DisabledKvStateRequestStats());
-
-               this.lookupService = lookupService;
-               this.kvStateClient = networkClient;
-               this.executionContext = actorSystem.dispatcher();
-               this.executionConfig = new ExecutionConfig();
-
-               this.lookupService.start();
-       }
-
-       /** Gets the {@link ExecutionConfig}. */
-       public ExecutionConfig getExecutionConfig() {
-               return executionConfig;
        }
 
-       /** Sets the {@link ExecutionConfig}. */
-       public void setExecutionConfig(ExecutionConfig config) {
-               this.executionConfig = config;
-       }
-
-       /**
-        * Creates a client.
-        *
-        * @param lookupService    Location lookup service
-        * @param kvStateClient    Network client for queries
-        * @param executionContext Execution context for futures
-        */
-       public QueryableStateClient(
-                       KvStateLocationLookupService lookupService,
-                       KvStateClient kvStateClient,
-                       ExecutionContext executionContext) {
-
-               this.lookupService = Preconditions.checkNotNull(lookupService, 
"KvStateLocationLookupService");
-               this.kvStateClient = Preconditions.checkNotNull(kvStateClient, 
"KvStateClient");
-               this.executionContext = 
Preconditions.checkNotNull(executionContext, "ExecutionContext");
-               this.actorSystem = null;
-
-               this.lookupService.start();
+       /** Shuts down the client. */
+       public void shutdown() {
+               client.shutdown();
        }
 
        /**
-        * Returns the execution context of this client.
-        *
-        * @return The execution context used by the client.
+        * Gets the {@link ExecutionConfig}.
         */
-       public ExecutionContext getExecutionContext() {
-               return executionContext;
-       }
-
-       /**
-        * Shuts down the client and all components.
-        */
-       public void shutDown() {
-               try {
-                       lookupService.shutDown();
-               } catch (Throwable t) {
-                       LOG.error("Failed to shut down KvStateLookupService", 
t);
-               }
-
-               try {
-                       kvStateClient.shutDown();
-               } catch (Throwable t) {
-                       LOG.error("Failed to shut down KvStateClient", t);
-               }
-
-               if (actorSystem != null) {
-                       try {
-                               actorSystem.shutdown();
-                       } catch (Throwable t) {
-                               LOG.error("Failed to shut down ActorSystem", t);
-                       }
-               }
+       public ExecutionConfig getExecutionConfig() {
+               return executionConfig;
        }
 
        /**
-        * Returns a future holding the serialized request result.
-        *
-        * <p>If the server does not serve a KvState instance with the given ID,
-        * the Future will be failed with a {@link UnknownKvStateID}.
-        *
-        * <p>If the KvState instance does not hold any data for the given key
-        * and namespace, the Future will be failed with a {@link 
UnknownKeyOrNamespace}.
-        *
-        * <p>All other failures are forwarded to the Future.
-        *
-        * @param jobId                     JobID of the job the queryable state
-        *                                  belongs to
-        * @param queryableStateName        Name under which the state is 
queryable
-        * @param keyHashCode               Integer hash code of the key 
(result of
-        *                                  a call to {@link Object#hashCode()}
-        * @param serializedKeyAndNamespace Serialized key and namespace to 
query
-        *                                  KvState instance with
-        * @return Future holding the serialized result
-        */
-       @SuppressWarnings("unchecked")
-       public Future<byte[]> getKvState(
-                       final JobID jobId,
-                       final String queryableStateName,
-                       final int keyHashCode,
-                       final byte[] serializedKeyAndNamespace) {
-
-               return getKvState(jobId, queryableStateName, keyHashCode, 
serializedKeyAndNamespace, false)
-                               .recoverWith(new Recover<Future<byte[]>>() {
-                                       @Override
-                                       public Future<byte[]> recover(Throwable 
failure) throws Throwable {
-                                               if (failure instanceof 
UnknownKvStateID ||
-                                                               failure 
instanceof UnknownKvStateKeyGroupLocation ||
-                                                               failure 
instanceof UnknownKvStateLocation ||
-                                                               failure 
instanceof ConnectException) {
-                                                       // These failures are 
likely to be caused by out-of-sync
-                                                       // KvStateLocation. 
Therefore we retry this query and
-                                                       // force look up the 
location.
-                                                       return getKvState(
-                                                                       jobId,
-                                                                       
queryableStateName,
-                                                                       
keyHashCode,
-                                                                       
serializedKeyAndNamespace,
-                                                                       true);
-                                               } else {
-                                                       return 
Futures.failed(failure);
-                                               }
-                                       }
-                               }, executionContext);
+        * Replaces the existing {@link ExecutionConfig} (possibly {@code 
null}), with the provided one.
+        * @param config The new {@code configuration}.
+        * @return The old configuration, or {@code null} if none was specified.
+        * */
+       public ExecutionConfig setExecutionConfig(ExecutionConfig config) {
+               ExecutionConfig prev = executionConfig;
+               this.executionConfig = config;
+               return prev;
        }
 
        /**
-        * Returns a future holding the request result.
-        *
-        * <p>If the server does not serve a KvState instance with the given ID,
-        * the Future will be failed with a {@link UnknownKvStateID}.
-        *
-        * <p>If the KvState instance does not hold any data for the given key
-        * and namespace, the Future will be failed with a {@link 
UnknownKeyOrNamespace}.
-        *
-        * <p>All other failures are forwarded to the Future.
-        *
+        * Returns a future holding the request result.  *
         * @param jobId                     JobID of the job the queryable 
state belongs to.
         * @param queryableStateName        Name under which the state is 
queryable.
         * @param key                               The key we are interested 
in.
@@ -333,7 +144,7 @@ public class QueryableStateClient {
         * @return Future holding the result.
         */
        @PublicEvolving
-       public <K, V> Future<V> getKvState(
+       public <K, V> CompletableFuture<V> getKvState(
                        final JobID jobId,
                        final String queryableStateName,
                        final K key,
@@ -347,16 +158,7 @@ public class QueryableStateClient {
        }
 
        /**
-        * Returns a future holding the request result.
-        *
-        * <p>If the server does not serve a KvState instance with the given ID,
-        * the Future will be failed with a {@link UnknownKvStateID}.
-        *
-        * <p>If the KvState instance does not hold any data for the given key
-        * and namespace, the Future will be failed with a {@link 
UnknownKeyOrNamespace}.
-        *
-        * <p>All other failures are forwarded to the Future.
-        *
+        * Returns a future holding the request result.  *
         * @param jobId                     JobID of the job the queryable 
state belongs to.
         * @param queryableStateName        Name under which the state is 
queryable.
         * @param key                               The key we are interested 
in.
@@ -365,30 +167,19 @@ public class QueryableStateClient {
         * @return Future holding the result.
         */
        @PublicEvolving
-       public <K, V> Future<V> getKvState(
+       public <K, V> CompletableFuture<V> getKvState(
                        final JobID jobId,
                        final String queryableStateName,
                        final K key,
                        final TypeInformation<K> keyTypeInfo,
                        final StateDescriptor<?, V> stateDescriptor) {
 
-               Preconditions.checkNotNull(keyTypeInfo);
-
                return getKvState(jobId, queryableStateName, key, 
VoidNamespace.INSTANCE,
                                keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, 
stateDescriptor);
        }
 
        /**
         * Returns a future holding the request result.
-        *
-        * <p>If the server does not serve a KvState instance with the given ID,
-        * the Future will be failed with a {@link UnknownKvStateID}.
-        *
-        * <p>If the KvState instance does not hold any data for the given key
-        * and namespace, the Future will be failed with a {@link 
UnknownKeyOrNamespace}.
-        *
-        * <p>All other failures are forwarded to the Future.
-        *
         * @param jobId                     JobID of the job the queryable 
state belongs to.
         * @param queryableStateName        Name under which the state is 
queryable.
         * @param key                               The key that the state we 
request is associated with.
@@ -399,7 +190,7 @@ public class QueryableStateClient {
         * @return Future holding the result.
         */
        @PublicEvolving
-       public <K, V, N> Future<V> getKvState(
+       public <K, V, N> CompletableFuture<V> getKvState(
                        final JobID jobId,
                        final String queryableStateName,
                        final K key,
@@ -420,15 +211,6 @@ public class QueryableStateClient {
 
        /**
         * Returns a future holding the request result.
-        *
-        * <p>If the server does not serve a KvState instance with the given ID,
-        * the Future will be failed with a {@link UnknownKvStateID}.
-        *
-        * <p>If the KvState instance does not hold any data for the given key
-        * and namespace, the Future will be failed with a {@link 
UnknownKeyOrNamespace}.
-        *
-        * <p>All other failures are forwarded to the Future.
-        *
         * @param jobId                     JobID of the job the queryable 
state belongs to.
         * @param queryableStateName        Name under which the state is 
queryable.
         * @param key                               The key that the state we 
request is associated with.
@@ -439,7 +221,7 @@ public class QueryableStateClient {
         * @return Future holding the result.
         */
        @PublicEvolving
-       public <K, V, N> Future<V> getKvState(
+       public <K, N, V> CompletableFuture<V> getKvState(
                        final JobID jobId,
                        final String queryableStateName,
                        final K key,
@@ -448,8 +230,8 @@ public class QueryableStateClient {
                        final TypeInformation<N> namespaceTypeInfo,
                        final TypeSerializer<V> stateSerializer) {
 
+               Preconditions.checkNotNull(jobId);
                Preconditions.checkNotNull(queryableStateName);
-
                Preconditions.checkNotNull(key);
                Preconditions.checkNotNull(namespace);
 
@@ -457,36 +239,25 @@ public class QueryableStateClient {
                Preconditions.checkNotNull(namespaceTypeInfo);
                Preconditions.checkNotNull(stateSerializer);
 
-               if (stateSerializer instanceof ListSerializer) {
-                       throw new IllegalArgumentException("ListState is not 
supported out-of-the-box yet.");
-               }
-
                TypeSerializer<K> keySerializer = 
keyTypeInfo.createSerializer(executionConfig);
                TypeSerializer<N> namespaceSerializer = 
namespaceTypeInfo.createSerializer(executionConfig);
 
                final byte[] serializedKeyAndNamespace;
                try {
-                       serializedKeyAndNamespace = 
KvStateSerializer.serializeKeyAndNamespace(
-                                       key,
-                                       keySerializer,
-                                       namespace,
-                                       namespaceSerializer);
+                       serializedKeyAndNamespace = KvStateSerializer
+                                       .serializeKeyAndNamespace(key, 
keySerializer, namespace, namespaceSerializer);
                } catch (IOException e) {
-                       return Futures.failed(e);
+                       return FutureUtils.getFailedFuture(e);
                }
 
-               return getKvState(jobId, queryableStateName, key.hashCode(), 
serializedKeyAndNamespace)
-                               .flatMap(new Mapper<byte[], Future<V>>() {
-                                       @Override
-                                       public Future<V> apply(byte[] 
parameter) {
-                                               try {
-                                                       return 
Futures.successful(
-                                                                       
KvStateSerializer.deserializeValue(parameter, stateSerializer));
-                                               } catch (IOException e) {
-                                                       return 
Futures.failed(e);
-                                               }
+               return getKvState(jobId, queryableStateName, key.hashCode(), 
serializedKeyAndNamespace).thenApply(
+                               stateResponse -> {
+                                       try {
+                                               return 
KvStateSerializer.deserializeValue(stateResponse.getContent(), stateSerializer);
+                                       } catch (IOException e) {
+                                               throw new 
FlinkRuntimeException(e);
                                        }
-                               }, executionContext);
+                               });
        }
 
        /**
@@ -499,92 +270,20 @@ public class QueryableStateClient {
         *                                  a call to {@link Object#hashCode()}
         * @param serializedKeyAndNamespace Serialized key and namespace to 
query
         *                                  KvState instance with
-        * @param forceLookup               Flag to force lookup of the {@link 
KvStateLocation}
         * @return Future holding the serialized result
         */
-       private Future<byte[]> getKvState(
+       private CompletableFuture<KvStateResponse> getKvState(
                        final JobID jobId,
                        final String queryableStateName,
                        final int keyHashCode,
-                       final byte[] serializedKeyAndNamespace,
-                       boolean forceLookup) {
-
-               return getKvStateLookupInfo(jobId, queryableStateName, 
forceLookup)
-                               .flatMap(new Mapper<KvStateLocation, 
Future<byte[]>>() {
-                                       @Override
-                                       public Future<byte[]> 
apply(KvStateLocation lookup) {
-                                               int keyGroupIndex = 
KeyGroupRangeAssignment.computeKeyGroupForKeyHash(keyHashCode, 
lookup.getNumKeyGroups());
-
-                                               KvStateServerAddress 
serverAddress = lookup.getKvStateServerAddress(keyGroupIndex);
-                                               if (serverAddress == null) {
-                                                       return 
Futures.failed(new UnknownKvStateKeyGroupLocation());
-                                               } else {
-                                                       // Query server
-                                                       KvStateID kvStateId = 
lookup.getKvStateID(keyGroupIndex);
-                                                       return 
kvStateClient.getKvState(serverAddress, kvStateId, serializedKeyAndNamespace);
-                                               }
-                                       }
-                               }, executionContext);
-       }
-
-       /**
-        * Lookup the {@link KvStateLocation} for the given job and queryable 
state
-        * name.
-        *
-        * <p>The job manager will be queried for the location only if forced 
or no
-        * cached location can be found. There are no guarantees about
-        *
-        * @param jobId              JobID the state instance belongs to.
-        * @param queryableStateName Name under which the state instance has 
been published.
-        * @param forceUpdate        Flag to indicate whether to force a update 
via the lookup service.
-        * @return Future holding the KvStateLocation
-        */
-       private Future<KvStateLocation> getKvStateLookupInfo(
-                       JobID jobId,
-                       final String queryableStateName,
-                       boolean forceUpdate) {
-
-               if (forceUpdate) {
-                       Future<KvStateLocation> lookupFuture = lookupService
-                                       .getKvStateLookupInfo(jobId, 
queryableStateName);
-                       lookupCache.put(new Tuple2<>(jobId, 
queryableStateName), lookupFuture);
-                       return lookupFuture;
-               } else {
-                       Tuple2<JobID, String> cacheKey = new Tuple2<>(jobId, 
queryableStateName);
-                       final Future<KvStateLocation> cachedFuture = 
lookupCache.get(cacheKey);
-
-                       if (cachedFuture == null) {
-                               Future<KvStateLocation> lookupFuture = 
lookupService
-                                               .getKvStateLookupInfo(jobId, 
queryableStateName);
-
-                               Future<KvStateLocation> previous = 
lookupCache.putIfAbsent(cacheKey, lookupFuture);
-                               if (previous == null) {
-                                       return lookupFuture;
-                               } else {
-                                       return previous;
-                               }
-                       } else {
-                               // do not retain futures which failed as they 
will remain in
-                               // the cache even if the error cause is not 
present any more
-                               // and a new lookup may succeed
-                               if (cachedFuture.isCompleted() &&
-                                               
cachedFuture.value().get().isFailure()) {
-                                       // issue a new lookup
-                                       Future<KvStateLocation> lookupFuture = 
lookupService
-                                                       
.getKvStateLookupInfo(jobId, queryableStateName);
-
-                                       // replace the existing one if it has 
not been replaced yet
-                                       // otherwise return the one in the cache
-                                       if (lookupCache.replace(cacheKey, 
cachedFuture, lookupFuture)) {
-                                               return lookupFuture;
-                                       } else {
-                                               return 
lookupCache.get(cacheKey);
-                                       }
-                               } else {
-                                       return cachedFuture;
-                               }
-                       }
+                       final byte[] serializedKeyAndNamespace) {
+               LOG.info("Sending State Request to {}.", remoteAddress);
+               try {
+                       KvStateRequest request = new KvStateRequest(jobId, 
queryableStateName, keyHashCode, serializedKeyAndNamespace);
+                       return client.sendRequest(remoteAddress, request);
+               } catch (Exception e) {
+                       LOG.error("Unable to send KVStateRequest: ", e);
+                       return FutureUtils.getFailedFuture(e);
                }
        }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
new file mode 100644
index 0000000..d7191b6
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.proxy;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.queryablestate.UnknownKvStateIdException;
+import org.apache.flink.queryablestate.UnknownKvStateKeyGroupLocationException;
+import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
+import org.apache.flink.queryablestate.messages.KvStateRequest;
+import org.apache.flink.queryablestate.messages.KvStateResponse;
+import org.apache.flink.queryablestate.network.AbstractServerHandler;
+import org.apache.flink.queryablestate.network.Client;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.queryablestate.server.KvStateServerImpl;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.query.KvStateClientProxy;
+import org.apache.flink.runtime.query.KvStateID;
+import org.apache.flink.runtime.query.KvStateLocation;
+import org.apache.flink.runtime.query.KvStateMessage;
+import org.apache.flink.runtime.query.KvStateServerAddress;
+import org.apache.flink.runtime.query.UnknownKvStateLocation;
+import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
+import org.apache.flink.runtime.query.netty.KvStateRequestStats;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.ConnectException;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import scala.concurrent.duration.FiniteDuration;
+import scala.reflect.ClassTag$;
+
+/**
+ * This handler acts as an internal (to the Flink cluster) client that receives
+ * the requests from external clients, executes them by contacting the Job 
Manager (if necessary) and
+ * the Task Manager holding the requested state, and forwards the answer back 
to the client.
+ */
+@Internal
[email protected]
+public class KvStateClientProxyHandler extends 
AbstractServerHandler<KvStateRequest, KvStateResponse> {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(KvStateClientProxyHandler.class);
+
+       /** The proxy using this handler. */
+       private final KvStateClientProxy proxy;
+
+       /** A cache to hold the location of different states for which we have 
already seen requests. */
+       private final ConcurrentMap<Tuple2<JobID, String>, 
CompletableFuture<KvStateLocation>> lookupCache =
+                       new ConcurrentHashMap<>();
+
+       /**
+        * Network client to forward queries to {@link KvStateServerImpl state 
server}
+        * instances inside the cluster.
+        */
+       private final Client<KvStateInternalRequest, KvStateResponse> 
kvStateClient;
+
+       /**
+        * Create the handler used by the {@link KvStateClientProxyImpl}.
+        *
+        * @param proxy the {@link KvStateClientProxyImpl proxy} using the 
handler.
+        * @param queryExecutorThreads the number of threads used to process 
incoming requests.
+        * @param serializer the {@link MessageSerializer} used to (de-) 
serialize the different messages.
+        * @param stats server statistics collector.
+        */
+       public KvStateClientProxyHandler(
+                       final KvStateClientProxyImpl proxy,
+                       final int queryExecutorThreads,
+                       final MessageSerializer<KvStateRequest, 
KvStateResponse> serializer,
+                       final KvStateRequestStats stats) {
+
+               super(proxy, serializer, stats);
+               this.proxy = Preconditions.checkNotNull(proxy);
+               this.kvStateClient = createInternalClient(queryExecutorThreads);
+       }
+
+       private static Client<KvStateInternalRequest, KvStateResponse> 
createInternalClient(int threads) {
+               final MessageSerializer<KvStateInternalRequest, 
KvStateResponse> messageSerializer =
+                               new MessageSerializer<>(
+                                               new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(),
+                                               new 
KvStateResponse.KvStateResponseDeserializer());
+
+               return new Client<>(
+                               "Queryable State Proxy Client",
+                               threads,
+                               messageSerializer,
+                               new DisabledKvStateRequestStats());
+       }
+
+       @Override
+       public CompletableFuture<KvStateResponse> handleRequest(
+                       final long requestId,
+                       final KvStateRequest request) {
+               CompletableFuture<KvStateResponse> response = new 
CompletableFuture<>();
+               executeActionAsync(response, request, false);
+               return response;
+       }
+
+       private void executeActionAsync(
+                       final CompletableFuture<KvStateResponse> result,
+                       final KvStateRequest request,
+                       final boolean update) {
+
+               if (!result.isDone()) {
+                       final CompletableFuture<KvStateResponse> 
operationFuture = getState(request, update);
+                       operationFuture.whenCompleteAsync(
+                                       (t, throwable) -> {
+                                               if (throwable != null) {
+                                                       if (throwable 
instanceof CancellationException) {
+                                                               
result.completeExceptionally(throwable);
+                                                       } else if 
(throwable.getCause() instanceof UnknownKvStateIdException ||
+                                                                       
throwable.getCause() instanceof UnknownKvStateKeyGroupLocationException ||
+                                                                       
throwable.getCause() instanceof UnknownKvStateLocation ||
+                                                                       
throwable.getCause() instanceof ConnectException) {
+
+                                                               // These 
failures are likely to be caused by out-of-sync
+                                                               // 
KvStateLocation. Therefore we retry this query and
+                                                               // force look 
up the location.
+
+                                                               
executeActionAsync(result, request, true);
+                                                       } else {
+                                                               
result.completeExceptionally(throwable);
+                                                       }
+                                               } else {
+                                                       result.complete(t);
+                                               }
+                                       }, queryExecutor);
+
+                       result.whenComplete(
+                                       (t, throwable) -> 
operationFuture.cancel(false));
+               }
+       }
+
+       private CompletableFuture<KvStateResponse> getState(
+                       final KvStateRequest request,
+                       final boolean forceUpdate) {
+
+               return getKvStateLookupInfo(request.getJobId(), 
request.getStateName(), forceUpdate)
+                               .thenComposeAsync((Function<KvStateLocation, 
CompletableFuture<KvStateResponse>>) location -> {
+                                       final int keyGroupIndex = 
KeyGroupRangeAssignment.computeKeyGroupForKeyHash(
+                                                       
request.getKeyHashCode(), location.getNumKeyGroups());
+
+                                       final KvStateServerAddress 
serverAddress = location.getKvStateServerAddress(keyGroupIndex);
+                                       if (serverAddress == null) {
+                                               return 
FutureUtils.getFailedFuture(new 
UnknownKvStateKeyGroupLocationException(getServerName()));
+                                       } else {
+                                               // Query server
+                                               final KvStateID kvStateId = 
location.getKvStateID(keyGroupIndex);
+                                               final KvStateInternalRequest 
internalRequest = new KvStateInternalRequest(
+                                                               kvStateId, 
request.getSerializedKeyAndNamespace());
+                                               return 
kvStateClient.sendRequest(serverAddress, internalRequest);
+                                       }
+                               }, queryExecutor);
+       }
+
+       /**
+        * Lookup the {@link KvStateLocation} for the given job and queryable 
state name.
+        *
+        * <p>The job manager will be queried for the location only if forced 
or no
+        * cached location can be found. There are no guarantees about
+        *
+        * @param jobId              JobID the state instance belongs to.
+        * @param queryableStateName Name under which the state instance has 
been published.
+        * @param forceUpdate        Flag to indicate whether to force a update 
via the lookup service.
+        * @return Future holding the KvStateLocation
+        */
+       private CompletableFuture<KvStateLocation> getKvStateLookupInfo(
+                       final JobID jobId,
+                       final String queryableStateName,
+                       final boolean forceUpdate) {
+
+               final Tuple2<JobID, String> cacheKey = new Tuple2<>(jobId, 
queryableStateName);
+               final CompletableFuture<KvStateLocation> cachedFuture = 
lookupCache.get(cacheKey);
+
+               if (!forceUpdate && cachedFuture != null && 
!cachedFuture.isCompletedExceptionally()) {
+                       LOG.debug("Retrieving location for state={} of job={} 
from the cache.", jobId, queryableStateName);
+                       return cachedFuture;
+               }
+
+               LOG.debug("Retrieving location for state={} of job={} from the 
job manager.", jobId, queryableStateName);
+
+               return proxy.getJobManagerFuture().thenComposeAsync(
+                               jobManagerGateway -> {
+                                       final Object msg = new 
KvStateMessage.LookupKvStateLocation(jobId, queryableStateName);
+                                       final 
CompletableFuture<KvStateLocation> locationFuture = FutureUtils.toJava(
+                                                       
jobManagerGateway.ask(msg, FiniteDuration.apply(1000L, TimeUnit.MILLISECONDS))
+                                                                       
.mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class)));
+
+                                       lookupCache.put(cacheKey, 
locationFuture);
+                                       return locationFuture;
+                               }, queryExecutor);
+       }
+
+       @Override
+       public void shutdown() {
+               kvStateClient.shutdown();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
new file mode 100644
index 0000000..bca80de
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.proxy;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.queryablestate.UnknownJobManagerException;
+import org.apache.flink.queryablestate.messages.KvStateRequest;
+import org.apache.flink.queryablestate.messages.KvStateResponse;
+import org.apache.flink.queryablestate.network.AbstractServerBase;
+import org.apache.flink.queryablestate.network.AbstractServerHandler;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.query.KvStateClientProxy;
+import org.apache.flink.runtime.query.KvStateServerAddress;
+import org.apache.flink.runtime.query.netty.KvStateRequestStats;
+import org.apache.flink.util.Preconditions;
+
+import java.net.InetAddress;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The default implementation of the {@link KvStateClientProxy}.
+ */
+@Internal
+public class KvStateClientProxyImpl extends AbstractServerBase<KvStateRequest, 
KvStateResponse> implements KvStateClientProxy {
+
+       private static final CompletableFuture<ActorGateway> 
UNKNOWN_JOB_MANAGER =
+                       FutureUtils.getFailedFuture(new 
UnknownJobManagerException());
+
+       /** Number of threads used to process incoming requests. */
+       private final int queryExecutorThreads;
+
+       /** Statistics collector. */
+       private final KvStateRequestStats stats;
+
+       private final Object leaderLock = new Object();
+
+       private CompletableFuture<ActorGateway> jobManagerFuture = 
UNKNOWN_JOB_MANAGER;
+
+       /**
+        * Creates the Queryable State Client Proxy.
+        *
+        * <p>The server is instantiated using reflection by the
+        * {@link 
org.apache.flink.runtime.query.QueryableStateUtils#createKvStateClientProxy(InetAddress,
 int, int, int, KvStateRequestStats)
+        * QueryableStateUtils.startKvStateClientProxy(InetAddress, int, int, 
int, KvStateRequestStats)}.
+        *
+        * <p>The server needs to be started via {@link #start()} in order to 
bind
+        * to the configured bind address.
+        *
+        * @param bindAddress the address to listen to.
+        * @param bindPort the port to listen to.
+        * @param numEventLoopThreads number of event loop threads.
+        * @param numQueryThreads number of query threads.
+        * @param stats the statistics collector.
+        */
+       public KvStateClientProxyImpl(
+                       final InetAddress bindAddress,
+                       final Integer bindPort,
+                       final Integer numEventLoopThreads,
+                       final Integer numQueryThreads,
+                       final KvStateRequestStats stats) {
+
+               super("Queryable State Proxy Server", bindAddress, bindPort, 
numEventLoopThreads, numQueryThreads);
+               Preconditions.checkArgument(numQueryThreads >= 1, "Non-positive 
number of query threads.");
+               this.queryExecutorThreads = numQueryThreads;
+               this.stats = Preconditions.checkNotNull(stats);
+       }
+
+       @Override
+       public KvStateServerAddress getServerAddress() {
+               return super.getServerAddress();
+       }
+
+       @Override
+       public void start() throws InterruptedException {
+               super.start();
+       }
+
+       @Override
+       public void shutdown() {
+               super.shutdown();
+       }
+
+       @Override
+       public void updateJobManager(CompletableFuture<ActorGateway> 
leadingJobManager) throws Exception {
+               synchronized (leaderLock) {
+                       if (leadingJobManager == null) {
+                               jobManagerFuture = UNKNOWN_JOB_MANAGER;
+                       } else {
+                               jobManagerFuture = leadingJobManager;
+                       }
+               }
+       }
+
+       @Override
+       public CompletableFuture<ActorGateway> getJobManagerFuture() {
+               synchronized (leaderLock) {
+                       return jobManagerFuture;
+               }
+       }
+
+       @Override
+       public AbstractServerHandler<KvStateRequest, KvStateResponse> 
initializeHandler() {
+               MessageSerializer<KvStateRequest, KvStateResponse> serializer =
+                               new MessageSerializer<>(
+                                               new 
KvStateRequest.KvStateRequestDeserializer(),
+                                               new 
KvStateResponse.KvStateResponseDeserializer());
+               return new KvStateClientProxyHandler(this, 
queryExecutorThreads, serializer, stats);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateInternalRequest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateInternalRequest.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateInternalRequest.java
new file mode 100644
index 0000000..eedc2a1
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateInternalRequest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.messages;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.queryablestate.network.messages.MessageBody;
+import org.apache.flink.queryablestate.network.messages.MessageDeserializer;
+import org.apache.flink.runtime.query.KvStateID;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+import java.nio.ByteBuffer;
+
+/**
+ * The request to be forwarded by the {@link 
org.apache.flink.runtime.query.KvStateClientProxy
+ * Queryable State Client Proxy} to the {@link 
org.apache.flink.runtime.query.KvStateServer State Server}
+ * of the Task Manager responsible for the requested state.
+ */
+@Internal
+public class KvStateInternalRequest extends MessageBody {
+
+       private final KvStateID kvStateId;
+       private final byte[] serializedKeyAndNamespace;
+
+       public KvStateInternalRequest(
+                       final KvStateID stateId,
+                       final byte[] serializedKeyAndNamespace) {
+
+               this.kvStateId = Preconditions.checkNotNull(stateId);
+               this.serializedKeyAndNamespace = 
Preconditions.checkNotNull(serializedKeyAndNamespace);
+       }
+
+       public KvStateID getKvStateId() {
+               return kvStateId;
+       }
+
+       public byte[] getSerializedKeyAndNamespace() {
+               return serializedKeyAndNamespace;
+       }
+
+       @Override
+       public byte[] serialize() {
+
+               // KvStateId + sizeOf(serializedKeyAndNamespace) + 
serializedKeyAndNamespace
+               final int size = KvStateID.SIZE + Integer.BYTES + 
serializedKeyAndNamespace.length;
+
+               return ByteBuffer.allocate(size)
+                               .putLong(kvStateId.getLowerPart())
+                               .putLong(kvStateId.getUpperPart())
+                               .putInt(serializedKeyAndNamespace.length)
+                               .put(serializedKeyAndNamespace)
+                               .array();
+       }
+
+       /**
+        * A {@link MessageDeserializer deserializer} for {@link 
KvStateInternalRequest}.
+        */
+       public static class KvStateInternalRequestDeserializer implements 
MessageDeserializer<KvStateInternalRequest> {
+
+               @Override
+               public KvStateInternalRequest deserializeMessage(ByteBuf buf) {
+                       KvStateID kvStateId = new KvStateID(buf.readLong(), 
buf.readLong());
+
+                       int length = buf.readInt();
+                       Preconditions.checkArgument(length >= 0,
+                                       "Negative length for key and namespace. 
" +
+                                                       "This indicates a 
serialization error.");
+
+                       byte[] serializedKeyAndNamespace = new byte[length];
+                       if (length > 0) {
+                               buf.readBytes(serializedKeyAndNamespace);
+                       }
+                       return new KvStateInternalRequest(kvStateId, 
serializedKeyAndNamespace);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequest.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequest.java
index eb33bce..7eb39c7 100644
--- 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequest.java
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequest.java
@@ -18,72 +18,124 @@
 
 package org.apache.flink.queryablestate.messages;
 
-import org.apache.flink.runtime.query.KvStateID;
-import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.queryablestate.network.messages.MessageBody;
+import org.apache.flink.queryablestate.network.messages.MessageDeserializer;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
 /**
- * A {@link InternalKvState} instance request for a specific key and namespace.
+ * The request to be sent by the {@link 
org.apache.flink.queryablestate.client.QueryableStateClient
+ * Queryable State Client} to the {@link 
org.apache.flink.runtime.query.KvStateClientProxy Client Proxy}
+ * requesting a given state.
  */
-public final class KvStateRequest {
+@Internal
+public class KvStateRequest extends MessageBody {
 
-       /** ID for this request. */
-       private final long requestId;
+       private final JobID jobId;
+       private final String stateName;
+       private final int keyHashCode;
+       private final byte[] serializedKeyAndNamespace;
 
-       /** ID of the requested KvState instance. */
-       private final KvStateID kvStateId;
+       public KvStateRequest(
+                       final JobID jobId,
+                       final String stateName,
+                       final int keyHashCode,
+                       final byte[] serializedKeyAndNamespace) {
 
-       /** Serialized key and namespace to request from the KvState instance. 
*/
-       private final byte[] serializedKeyAndNamespace;
+               this.jobId = Preconditions.checkNotNull(jobId);
+               this.stateName = Preconditions.checkNotNull(stateName);
+               this.keyHashCode = keyHashCode;
+               this.serializedKeyAndNamespace = 
Preconditions.checkNotNull(serializedKeyAndNamespace);
+       }
 
-       /**
-        * Creates a KvState instance request.
-        *
-        * @param requestId                 ID for this request
-        * @param kvStateId                 ID of the requested KvState instance
-        * @param serializedKeyAndNamespace Serialized key and namespace to 
request from the KvState
-        *                                  instance
-        */
-       public KvStateRequest(long requestId, KvStateID kvStateId, byte[] 
serializedKeyAndNamespace) {
-               this.requestId = requestId;
-               this.kvStateId = Preconditions.checkNotNull(kvStateId, 
"KvStateID");
-               this.serializedKeyAndNamespace = 
Preconditions.checkNotNull(serializedKeyAndNamespace, "Serialized key and 
namespace");
+       public JobID getJobId() {
+               return jobId;
        }
 
-       /**
-        * Returns the request ID.
-        *
-        * @return Request ID
-        */
-       public long getRequestId() {
-               return requestId;
+       public String getStateName() {
+               return stateName;
        }
 
-       /**
-        * Returns the ID of the requested KvState instance.
-        *
-        * @return ID of the requested KvState instance
-        */
-       public KvStateID getKvStateId() {
-               return kvStateId;
+       public int getKeyHashCode() {
+               return keyHashCode;
        }
 
-       /**
-        * Returns the serialized key and namespace to request from the KvState
-        * instance.
-        *
-        * @return Serialized key and namespace to request from the KvState 
instance
-        */
        public byte[] getSerializedKeyAndNamespace() {
                return serializedKeyAndNamespace;
        }
 
        @Override
+       public byte[] serialize() {
+
+               byte[] serializedStateName = stateName.getBytes();
+
+               // JobID + stateName + sizeOf(stateName) + hashCode + 
keyAndNamespace + sizeOf(keyAndNamespace)
+               final int size =
+                               JobID.SIZE +
+                               serializedStateName.length + Integer.BYTES +
+                               Integer.BYTES +
+                               serializedKeyAndNamespace.length + 
Integer.BYTES;
+
+               return ByteBuffer.allocate(size)
+                               .putLong(jobId.getLowerPart())
+                               .putLong(jobId.getUpperPart())
+                               .putInt(serializedStateName.length)
+                               .put(serializedStateName)
+                               .putInt(keyHashCode)
+                               .putInt(serializedKeyAndNamespace.length)
+                               .put(serializedKeyAndNamespace)
+                               .array();
+       }
+
+       @Override
        public String toString() {
                return "KvStateRequest{" +
-                               "requestId=" + requestId +
-                               ", kvStateId=" + kvStateId +
-                               ", serializedKeyAndNamespace.length=" + 
serializedKeyAndNamespace.length +
+                               "jobId=" + jobId +
+                               ", stateName='" + stateName + '\'' +
+                               ", keyHashCode=" + keyHashCode +
+                               ", serializedKeyAndNamespace=" + 
Arrays.toString(serializedKeyAndNamespace) +
                                '}';
        }
+
+       /**
+        * A {@link MessageDeserializer deserializer} for {@link 
KvStateRequest}.
+        */
+       public static class KvStateRequestDeserializer implements 
MessageDeserializer<KvStateRequest> {
+
+               @Override
+               public KvStateRequest deserializeMessage(ByteBuf buf) {
+                       JobID jobId = new JobID(buf.readLong(), buf.readLong());
+
+                       int statenameLength = buf.readInt();
+                       Preconditions.checkArgument(statenameLength >= 0,
+                                       "Negative length for state name. " +
+                                                       "This indicates a 
serialization error.");
+
+                       String stateName = "";
+                       if (statenameLength > 0) {
+                               byte[] name = new byte[statenameLength];
+                               buf.readBytes(name);
+                               stateName = new String(name);
+                       }
+
+                       int keyHashCode = buf.readInt();
+
+                       int knamespaceLength = buf.readInt();
+                       Preconditions.checkArgument(knamespaceLength >= 0,
+                                       "Negative length for key and namespace. 
" +
+                                                       "This indicates a 
serialization error.");
+
+                       byte[] serializedKeyAndNamespace = new 
byte[knamespaceLength];
+                       if (knamespaceLength > 0) {
+                               buf.readBytes(serializedKeyAndNamespace);
+                       }
+                       return new KvStateRequest(jobId, stateName, 
keyHashCode, serializedKeyAndNamespace);
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequestFailure.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequestFailure.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequestFailure.java
deleted file mode 100644
index 4015d79..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequestFailure.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.messages;
-
-/**
- * A failure response to a {@link KvStateRequest}.
- */
-public final class KvStateRequestFailure {
-
-       /** ID of the request responding to. */
-       private final long requestId;
-
-       /** Failure cause. Not allowed to be a user type. */
-       private final Throwable cause;
-
-       /**
-        * Creates a failure response to a {@link KvStateRequest}.
-        *
-        * @param requestId ID for the request responding to
-        * @param cause     Failure cause (not allowed to be a user type)
-        */
-       public KvStateRequestFailure(long requestId, Throwable cause) {
-               this.requestId = requestId;
-               this.cause = cause;
-       }
-
-       /**
-        * Returns the request ID responding to.
-        *
-        * @return Request ID responding to
-        */
-       public long getRequestId() {
-               return requestId;
-       }
-
-       /**
-        * Returns the failure cause.
-        *
-        * @return Failure cause
-        */
-       public Throwable getCause() {
-               return cause;
-       }
-
-       @Override
-       public String toString() {
-               return "KvStateRequestFailure{" +
-                               "requestId=" + requestId +
-                               ", cause=" + cause +
-                               '}';
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequestResult.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequestResult.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequestResult.java
deleted file mode 100644
index 6bf2397..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequestResult.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.messages;
-
-import org.apache.flink.util.Preconditions;
-
-/**
- * A successful response to a {@link KvStateRequest} containing the serialized
- * result for the requested key and namespace.
- */
-public final class KvStateRequestResult {
-
-       /** ID of the request responding to. */
-       private final long requestId;
-
-       /**
-        * Serialized result for the requested key and namespace. If no result 
was
-        * available for the specified key and namespace, this is 
<code>null</code>.
-        */
-       private final byte[] serializedResult;
-
-       /**
-        * Creates a successful {@link KvStateRequestResult} response.
-        *
-        * @param requestId        ID of the request responding to
-        * @param serializedResult Serialized result or <code>null</code> if 
none
-        */
-       public KvStateRequestResult(long requestId, byte[] serializedResult) {
-               this.requestId = requestId;
-               this.serializedResult = 
Preconditions.checkNotNull(serializedResult, "Serialization result");
-       }
-
-       /**
-        * Returns the request ID responding to.
-        *
-        * @return Request ID responding to
-        */
-       public long getRequestId() {
-               return requestId;
-       }
-
-       /**
-        * Returns the serialized result or <code>null</code> if none available.
-        *
-        * @return Serialized result or <code>null</code> if none available.
-        */
-       public byte[] getSerializedResult() {
-               return serializedResult;
-       }
-
-       @Override
-       public String toString() {
-               return "KvStateRequestResult{" +
-                               "requestId=" + requestId +
-                               ", serializedResult.length=" + 
serializedResult.length +
-                               '}';
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateResponse.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateResponse.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateResponse.java
new file mode 100644
index 0000000..462135f
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateResponse.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.messages;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.queryablestate.network.messages.MessageBody;
+import org.apache.flink.queryablestate.network.messages.MessageDeserializer;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+import java.nio.ByteBuffer;
+
+/**
+ * The response containing the (serialized) state sent by the {@link 
org.apache.flink.runtime.query.KvStateServer
+ * State Server} to the {@link 
org.apache.flink.runtime.query.KvStateClientProxy Client Proxy}, and then 
forwarded
+ * by the proxy to the original {@link 
org.apache.flink.queryablestate.client.QueryableStateClient Queryable State
+ * Client}.
+ */
+@Internal
+public class KvStateResponse extends MessageBody {
+
+       private final byte[] content;
+
+       public KvStateResponse(final byte[] content) {
+               this.content = Preconditions.checkNotNull(content);
+       }
+
+       public byte[] getContent() {
+               return content;
+       }
+
+       @Override
+       public byte[] serialize() {
+               final int size = Integer.BYTES + content.length;
+               return ByteBuffer.allocate(size)
+                               .putInt(content.length)
+                               .put(content)
+                               .array();
+       }
+
+       /**
+        * A {@link MessageDeserializer deserializer} for {@link 
KvStateResponseDeserializer}.
+        */
+       public static class KvStateResponseDeserializer implements 
MessageDeserializer<KvStateResponse> {
+
+               @Override
+               public KvStateResponse deserializeMessage(ByteBuf buf) {
+                       int length = buf.readInt();
+                       Preconditions.checkArgument(length >= 0,
+                                       "Negative length for state content. " +
+                                                       "This indicates a 
serialization error.");
+                       byte[] content = new byte[length];
+                       buf.readBytes(content);
+
+                       return new KvStateResponse(content);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
new file mode 100644
index 0000000..4bf8e98
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.queryablestate.network.messages.MessageBody;
+import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
+import org.apache.flink.runtime.query.KvStateServerAddress;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption;
+import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The base class for every server in the queryable state module.
+ * It is using pure netty to send and receive messages of type {@link 
MessageBody}.
+ *
+ * @param <REQ> the type of request the server expects to receive.
+ * @param <RESP> the type of response the server will send.
+ */
+@Internal
+public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends 
MessageBody> {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(AbstractServerBase.class);
+
+       /** AbstractServerBase config: low water mark. */
+       private static final int LOW_WATER_MARK = 8 * 1024;
+
+       /** AbstractServerBase config: high water mark. */
+       private static final int HIGH_WATER_MARK = 32 * 1024;
+
+       private final String serverName;
+
+       /** Netty's ServerBootstrap. */
+       private final ServerBootstrap bootstrap;
+
+       /** Query executor thread pool. */
+       private final ExecutorService queryExecutor;
+
+       /** Address of this server. */
+       private KvStateServerAddress serverAddress;
+
+       /** The handler used for the incoming messages. */
+       private AbstractServerHandler<REQ, RESP> handler;
+
+       /**
+        * Creates the {@link AbstractServerBase}.
+        *
+        * <p>The server needs to be started via {@link #start()} in order to 
bind
+        * to the configured bind address.
+        *
+        * @param serverName the name of the server
+        * @param bindAddress address to bind to
+        * @param bindPort port to bind to (random port if 0)
+        * @param numEventLoopThreads number of event loop threads
+        */
+       protected AbstractServerBase(
+                       final String serverName,
+                       final InetAddress bindAddress,
+                       final Integer bindPort,
+                       final Integer numEventLoopThreads,
+                       final Integer numQueryThreads) {
+
+               Preconditions.checkNotNull(bindAddress);
+               Preconditions.checkArgument(bindPort >= 0 && bindPort <= 65536, 
"Port " + bindPort + " out of valid range (0-65536).");
+               Preconditions.checkArgument(numEventLoopThreads >= 1, 
"Non-positive number of event loop threads.");
+               Preconditions.checkArgument(numQueryThreads >= 1, "Non-positive 
number of query threads.");
+
+               this.serverName = Preconditions.checkNotNull(serverName);
+               this.queryExecutor = createQueryExecutor(numQueryThreads);
+
+               final NettyBufferPool bufferPool = new 
NettyBufferPool(numEventLoopThreads);
+
+               final ThreadFactory threadFactory = new ThreadFactoryBuilder()
+                               .setDaemon(true)
+                               .setNameFormat("Flink " + serverName + " 
EventLoop Thread %d")
+                               .build();
+
+               final NioEventLoopGroup nioGroup = new 
NioEventLoopGroup(numEventLoopThreads, threadFactory);
+
+               bootstrap = new ServerBootstrap()
+                               // Bind address and port
+                               .localAddress(bindAddress, bindPort)
+                               // NIO server channels
+                               .group(nioGroup)
+                               .channel(NioServerSocketChannel.class)
+                               // AbstractServerBase channel Options
+                               .option(ChannelOption.ALLOCATOR, bufferPool)
+                               // Child channel options
+                               .childOption(ChannelOption.ALLOCATOR, 
bufferPool)
+                               
.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, HIGH_WATER_MARK)
+                               
.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, LOW_WATER_MARK);
+       }
+
+       /**
+        * Creates a thread pool for the query execution.
+        *
+        * @param numQueryThreads Number of query threads.
+        * @return Thread pool for query execution
+        */
+       private ExecutorService createQueryExecutor(int numQueryThreads) {
+               ThreadFactory threadFactory = new ThreadFactoryBuilder()
+                               .setDaemon(true)
+                               .setNameFormat("Flink " + getServerName() + " 
Thread %d")
+                               .build();
+
+               return Executors.newFixedThreadPool(numQueryThreads, 
threadFactory);
+       }
+
+       protected ExecutorService getQueryExecutor() {
+               return queryExecutor;
+       }
+
+       public String getServerName() {
+               return serverName;
+       }
+
+       public abstract AbstractServerHandler<REQ, RESP> initializeHandler();
+
+       /**
+        * Starts the server by binding to the configured bind address 
(blocking).
+        * @throws InterruptedException If interrupted during the bind operation
+        */
+       public void start() throws InterruptedException {
+               Preconditions.checkState(serverAddress == null,
+                               "Server " + serverName + " has already been 
started @ " + serverAddress + '.');
+
+               this.handler = initializeHandler();
+               bootstrap.childHandler(new ServerChannelInitializer<>(handler));
+
+               Channel channel = bootstrap.bind().sync().channel();
+               InetSocketAddress localAddress = (InetSocketAddress) 
channel.localAddress();
+               serverAddress = new 
KvStateServerAddress(localAddress.getAddress(), localAddress.getPort());
+
+               LOG.info("Started server {} @ {}", serverName, serverAddress);
+       }
+
+       /**
+        * Returns the address of this server.
+        *
+        * @return AbstractServerBase address
+        * @throws IllegalStateException If server has not been started yet
+        */
+       public KvStateServerAddress getServerAddress() {
+               Preconditions.checkState(serverAddress != null, "Server " + 
serverName + " has not been started.");
+               return serverAddress;
+       }
+
+       /**
+        * Shuts down the server and all related thread pools.
+        */
+       public void shutdown() {
+               LOG.info("Shutting down server {} @ {}", serverName, 
serverAddress);
+
+               if (handler != null) {
+                       handler.shutdown();
+               }
+
+               if (queryExecutor != null) {
+                       queryExecutor.shutdown();
+               }
+
+               if (bootstrap != null) {
+                       EventLoopGroup group = bootstrap.group();
+                       if (group != null) {
+                               group.shutdownGracefully(0L, 10L, 
TimeUnit.SECONDS);
+                       }
+               }
+               serverAddress = null;
+       }
+
+       /**
+        * Channel pipeline initializer.
+        *
+        * <p>The request handler is shared, whereas the other handlers are 
created
+        * per channel.
+        */
+       private static final class ServerChannelInitializer<REQ extends 
MessageBody, RESP extends MessageBody> extends 
ChannelInitializer<SocketChannel> {
+
+               /** The shared request handler. */
+               private final AbstractServerHandler<REQ, RESP> 
sharedRequestHandler;
+
+               /**
+                * Creates the channel pipeline initializer with the shared 
request handler.
+                *
+                * @param sharedRequestHandler Shared request handler.
+                */
+               ServerChannelInitializer(AbstractServerHandler<REQ, RESP> 
sharedRequestHandler) {
+                       this.sharedRequestHandler = 
Preconditions.checkNotNull(sharedRequestHandler, "MessageBody handler");
+               }
+
+               @Override
+               protected void initChannel(SocketChannel channel) throws 
Exception {
+                       channel.pipeline()
+                                       .addLast(new ChunkedWriteHandler())
+                                       .addLast(new 
LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))
+                                       .addLast(sharedRequestHandler);
+               }
+       }
+
+       @VisibleForTesting
+       public boolean isExecutorShutdown() {
+               return queryExecutor.isShutdown();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
new file mode 100644
index 0000000..b9bf671
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.queryablestate.network.messages.MessageBody;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.queryablestate.network.messages.MessageType;
+import org.apache.flink.runtime.query.netty.KvStateRequestStats;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The base class of every handler used by an {@link AbstractServerBase}.
+ *
+ * @param <REQ> the type of request the server expects to receive.
+ * @param <RESP> the type of response the server will send.
+ */
+@Internal
[email protected]
+public abstract class AbstractServerHandler<REQ extends MessageBody, RESP 
extends MessageBody> extends ChannelInboundHandlerAdapter {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(AbstractServerHandler.class);
+
+       /** The owning server of this handler. */
+       private final AbstractServerBase<REQ, RESP> server;
+
+       /** The serializer used to (de-)serialize messages. */
+       private final MessageSerializer<REQ, RESP> serializer;
+
+       /** Thread pool for query execution. */
+       protected final ExecutorService queryExecutor;
+
+       /** Exposed server statistics. */
+       private final KvStateRequestStats stats;
+
+       /**
+        * Create the handler.
+        *
+        * @param serializer the serializer used to (de-)serialize messages
+        * @param stats statistics collector
+        */
+       public AbstractServerHandler(
+                       final AbstractServerBase<REQ, RESP> server,
+                       final MessageSerializer<REQ, RESP> serializer,
+                       final KvStateRequestStats stats) {
+
+               this.server = Preconditions.checkNotNull(server);
+               this.serializer = Preconditions.checkNotNull(serializer);
+               this.queryExecutor = server.getQueryExecutor();
+               this.stats = Preconditions.checkNotNull(stats);
+       }
+
+       protected String getServerName() {
+               return server.getServerName();
+       }
+
+       @Override
+       public void channelActive(ChannelHandlerContext ctx) throws Exception {
+               stats.reportActiveConnection();
+       }
+
+       @Override
+       public void channelInactive(ChannelHandlerContext ctx) throws Exception 
{
+               stats.reportInactiveConnection();
+       }
+
+       @Override
+       public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
+               REQ request = null;
+               long requestId = -1L;
+
+               try {
+                       final ByteBuf buf = (ByteBuf) msg;
+                       final MessageType msgType = 
MessageSerializer.deserializeHeader(buf);
+
+                       requestId = MessageSerializer.getRequestId(buf);
+
+                       if (msgType == MessageType.REQUEST) {
+
+                               // 
------------------------------------------------------------
+                               // MessageBody
+                               // 
------------------------------------------------------------
+                               request = serializer.deserializeRequest(buf);
+                               stats.reportRequest();
+
+                               // Execute actual query async, because it is 
possibly
+                               // blocking (e.g. file I/O).
+                               //
+                               // A submission failure is not treated as 
fatal. todo here if there is a shared resource e.g. registry, then I will have 
to sync on that.
+                               queryExecutor.submit(new 
AsyncRequestTask<>(this, ctx, requestId, request, stats));
+
+                       } else {
+                               // 
------------------------------------------------------------
+                               // Unexpected
+                               // 
------------------------------------------------------------
+
+                               final String errMsg = "Unexpected message type 
" + msgType + ". Expected " + MessageType.REQUEST + ".";
+                               final ByteBuf failure = 
MessageSerializer.serializeServerFailure(ctx.alloc(), new 
IllegalArgumentException(errMsg));
+
+                               LOG.debug(errMsg);
+                               ctx.writeAndFlush(failure);
+                       }
+               } catch (Throwable t) {
+                       final String stringifiedCause = 
ExceptionUtils.stringifyException(t);
+
+                       String errMsg;
+                       ByteBuf err;
+                       if (request != null) {
+                               errMsg = "Failed request with ID " + requestId 
+ ". Caused by: " + stringifiedCause;
+                               err = 
MessageSerializer.serializeRequestFailure(ctx.alloc(), requestId, new 
RuntimeException(errMsg));
+                               stats.reportFailedRequest();
+                       } else {
+                               errMsg = "Failed incoming message. Caused by: " 
+ stringifiedCause;
+                               err = 
MessageSerializer.serializeServerFailure(ctx.alloc(), new 
RuntimeException(errMsg));
+                       }
+
+                       LOG.debug(errMsg);
+                       ctx.writeAndFlush(err);
+
+               } finally {
+                       // IMPORTANT: We have to always recycle the incoming 
buffer.
+                       // Otherwise we will leak memory out of Netty's buffer 
pool.
+                       //
+                       // If any operation ever holds on to the buffer, it is 
the
+                       // responsibility of that operation to retain the 
buffer and
+                       // release it later.
+                       ReferenceCountUtil.release(msg);
+               }
+       }
+
+       @Override
+       public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
throws Exception {
+               final String msg = "Exception in server pipeline. Caused by: " 
+ ExceptionUtils.stringifyException(cause);
+               final ByteBuf err = 
serializer.serializeServerFailure(ctx.alloc(), new RuntimeException(msg));
+
+               LOG.debug(msg);
+               ctx.writeAndFlush(err).addListener(ChannelFutureListener.CLOSE);
+       }
+
+       /**
+        * Handles an incoming request and returns a {@link CompletableFuture} 
containing the corresponding response.
+        *
+        * <p><b>NOTE:</b> This method is called by multiple threads.
+        *
+        * @param requestId the id of the received request to be handled.
+        * @param request the request to be handled.
+        * @return A future with the response to be forwarded to the client.
+        */
+       public abstract CompletableFuture<RESP> handleRequest(final long 
requestId, final REQ request);
+
+       /**
+        * Shuts down any handler specific resources, e.g. thread pools etc.
+        */
+       public abstract void shutdown();
+
+       /**
+        * Task to execute the actual query against the {@link InternalKvState} 
instance.
+        */
+       private static class AsyncRequestTask<REQ extends MessageBody, RESP 
extends MessageBody> implements Runnable {
+
+               private final AbstractServerHandler<REQ, RESP> handler;
+
+               private final ChannelHandlerContext ctx;
+
+               private final long requestId;
+
+               private final REQ request;
+
+               private final KvStateRequestStats stats;
+
+               private final long creationNanos;
+
+               AsyncRequestTask(
+                               final AbstractServerHandler<REQ, RESP> handler,
+                               final ChannelHandlerContext ctx,
+                               final long requestId,
+                               final REQ request,
+                               final KvStateRequestStats stats) {
+
+                       this.handler = Preconditions.checkNotNull(handler);
+                       this.ctx = Preconditions.checkNotNull(ctx);
+                       this.requestId = requestId;
+                       this.request = Preconditions.checkNotNull(request);
+                       this.stats = Preconditions.checkNotNull(stats);
+                       this.creationNanos = System.nanoTime();
+               }
+
+               @Override
+               public void run() {
+
+                       if (!ctx.channel().isActive()) {
+                               return;
+                       }
+
+                       handler.handleRequest(requestId, 
request).whenComplete((resp, throwable) -> {
+                               try {
+                                       if (throwable != null) {
+                                               throw throwable instanceof 
CompletionException
+                                                               ? 
throwable.getCause()
+                                                               : throwable;
+                                       }
+
+                                       if (resp == null) {
+                                               throw new 
BadRequestException(handler.getServerName(), "NULL returned for request with ID 
" + requestId + ".");
+                                       }
+
+                                       final ByteBuf serialResp = 
MessageSerializer.serializeResponse(ctx.alloc(), requestId, resp);
+
+                                       int highWatermark = 
ctx.channel().config().getWriteBufferHighWaterMark();
+
+                                       ChannelFuture write;
+                                       if (serialResp.readableBytes() <= 
highWatermark) {
+                                               write = 
ctx.writeAndFlush(serialResp);
+                                       } else {
+                                               write = ctx.writeAndFlush(new 
ChunkedByteBuf(serialResp, highWatermark));
+                                       }
+                                       write.addListener(new 
RequestWriteListener());
+
+                               } catch (BadRequestException e) {
+                                       try {
+                                               stats.reportFailedRequest();
+                                               final ByteBuf err = 
MessageSerializer.serializeRequestFailure(ctx.alloc(), requestId, e);
+                                               ctx.writeAndFlush(err);
+                                       } catch (IOException io) {
+                                               LOG.error("Failed to respond 
with the error after failed request", io);
+                                       }
+                               } catch (Throwable t) {
+                                       try {
+                                               stats.reportFailedRequest();
+
+                                               final String errMsg = "Failed 
request " + requestId + ". Caused by: " + ExceptionUtils.stringifyException(t);
+                                               final ByteBuf err = 
MessageSerializer.serializeRequestFailure(ctx.alloc(), requestId, new 
RuntimeException(errMsg));
+                                               ctx.writeAndFlush(err);
+                                       } catch (IOException io) {
+                                               LOG.error("Failed to respond 
with the error after failed request", io);
+                                       }
+                               }
+                       });
+               }
+
+               @Override
+               public String toString() {
+                       return "AsyncRequestTask{" +
+                                       "requestId=" + requestId +
+                                       ", request=" + request +
+                                       '}';
+               }
+
+               /**
+                * Callback after query result has been written.
+                *
+                * <p>Gathers stats and logs errors.
+                */
+               private class RequestWriteListener implements 
ChannelFutureListener {
+
+                       @Override
+                       public void operationComplete(ChannelFuture future) 
throws Exception {
+                               long durationNanos = System.nanoTime() - 
creationNanos;
+                               long durationMillis = 
TimeUnit.MILLISECONDS.convert(durationNanos, TimeUnit.NANOSECONDS);
+
+                               if (future.isSuccess()) {
+                                       LOG.debug("Request {} was successfully 
answered after {} ms.", request, durationMillis);
+                                       
stats.reportSuccessfulRequest(durationMillis);
+                               } else {
+                                       LOG.debug("Request {} failed after {} 
ms : ", request, durationMillis, future.cause());
+                                       stats.reportFailedRequest();
+                               }
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/BadRequestException.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/BadRequestException.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/BadRequestException.java
new file mode 100644
index 0000000..3c0c484
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/BadRequestException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Base class for exceptions thrown during querying Flink's managed state.
+ */
+@Internal
+public class BadRequestException extends Exception {
+
+       private static final long serialVersionUID = 3458743952407632903L;
+
+       public BadRequestException(String serverName, String message) {
+               super(Preconditions.checkNotNull(serverName) + " : " + message);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ChunkedByteBuf.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ChunkedByteBuf.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ChunkedByteBuf.java
new file mode 100644
index 0000000..9c56025
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ChunkedByteBuf.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedInput;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
+
+/**
+ * A {@link ByteBuf} instance to be consumed in chunks by {@link 
ChunkedWriteHandler},
+ * respecting the high and low watermarks.
+ *
+ * @see <a 
href="http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#10.0";>Low/High
 Watermarks</a>
+ */
+@Internal
+public class ChunkedByteBuf implements ChunkedInput<ByteBuf> {
+
+       /** The buffer to chunk. */
+       private final ByteBuf buf;
+
+       /** Size of chunks. */
+       private final int chunkSize;
+
+       /** Closed flag. */
+       private boolean isClosed;
+
+       /** End of input flag. */
+       private boolean isEndOfInput;
+
+       public ChunkedByteBuf(ByteBuf buf, int chunkSize) {
+               this.buf = Preconditions.checkNotNull(buf, "Buffer");
+               Preconditions.checkArgument(chunkSize > 0, "Non-positive chunk 
size");
+               this.chunkSize = chunkSize;
+       }
+
+       @Override
+       public boolean isEndOfInput() throws Exception {
+               return isClosed || isEndOfInput;
+       }
+
+       @Override
+       public void close() throws Exception {
+               if (!isClosed) {
+                       // If we did not consume the whole buffer yet, we have 
to release
+                       // it here. Otherwise, it's the responsibility of the 
consumer.
+                       if (!isEndOfInput) {
+                               buf.release();
+                       }
+
+                       isClosed = true;
+               }
+       }
+
+       @Override
+       public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
+               if (isClosed) {
+                       return null;
+               } else if (buf.readableBytes() <= chunkSize) {
+                       isEndOfInput = true;
+
+                       // Don't retain as the consumer is responsible to 
release it
+                       return buf.slice();
+               } else {
+                       // Return a chunk sized slice of the buffer. The ref 
count is
+                       // shared with the original buffer. That's why we need 
to retain
+                       // a reference here.
+                       return buf.readSlice(chunkSize).retain();
+               }
+       }
+
+       @Override
+       public String toString() {
+               return "ChunkedByteBuf{" +
+                               "buf=" + buf +
+                               ", chunkSize=" + chunkSize +
+                               ", isClosed=" + isClosed +
+                               ", isEndOfInput=" + isEndOfInput +
+                               '}';
+       }
+}

Reply via email to