http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
deleted file mode 100644
index 70bccf0..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.client;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.state.State;
-import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.queryablestate.client.state.ImmutableStateBinder;
-import org.apache.flink.queryablestate.messages.KvStateRequest;
-import org.apache.flink.queryablestate.messages.KvStateResponse;
-import org.apache.flink.queryablestate.network.Client;
-import org.apache.flink.queryablestate.network.messages.MessageSerializer;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.query.KvStateServerAddress;
-import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
-import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceTypeInfo;
-import org.apache.flink.runtime.util.Hardware;
-import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.flink.util.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.concurrent.CompletableFuture;
-
-/**
- * Client for querying Flink's managed state.
- *
- * <p>You can mark state as queryable via {@link 
StateDescriptor#setQueryable(String)}.
- * The state instance created from this descriptor will be published for 
queries when it's
- * created on the Task Managers and the location will be reported to the Job 
Manager.
- *
- * <p>The client connects to a {@link 
org.apache.flink.runtime.query.KvStateClientProxy Client Proxy}
- * running on a given Task Manager. The proxy is the entry point of the client 
to the Flink cluster.
- * It forwards the requests of the client to the Job Manager and the required 
Task Manager, and forwards
- * the final response back the client.
- *
- * <p>The proxy, initially resolves the location of the requested KvState via 
the JobManager. Resolved
- * locations are cached. When the server address of the requested KvState 
instance is determined, the
- * client sends out a request to the server. The returned final answer is then 
forwarded to the Client.
- */
-@PublicEvolving
-public class QueryableStateClient {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(QueryableStateClient.class);
-
-       /** The client that forwards the requests to the proxy. */
-       private final Client<KvStateRequest, KvStateResponse> client;
-
-       /** The address of the proxy this client is connected to. */
-       private final KvStateServerAddress remoteAddress;
-
-       /** The execution configuration used to instantiate the different 
(de-)serializers. */
-       private ExecutionConfig executionConfig;
-
-       /**
-        * Create the Queryable State Client.
-        * @param remoteHostname the hostname of the {@link 
org.apache.flink.runtime.query.KvStateClientProxy proxy}
-        *                       to connect to.
-        * @param remotePort the port of the proxy to connect to.
-        */
-       public QueryableStateClient(final String remoteHostname, final int 
remotePort) throws UnknownHostException {
-               
this(InetAddress.getByName(Preconditions.checkNotNull(remoteHostname)), 
remotePort);
-       }
-
-       /**
-        * Create the Queryable State Client.
-        * @param remoteAddress the {@link InetAddress address} of the
-        *                      {@link 
org.apache.flink.runtime.query.KvStateClientProxy proxy} to connect to.
-        * @param remotePort the port of the proxy to connect to.
-        */
-       public QueryableStateClient(final InetAddress remoteAddress, final int 
remotePort) {
-               Preconditions.checkArgument(remotePort >= 0 && remotePort <= 
65536,
-                               "Remote Port " + remotePort + " is out of valid 
port range (0-65536).");
-
-               this.remoteAddress = new KvStateServerAddress(remoteAddress, 
remotePort);
-
-               final MessageSerializer<KvStateRequest, KvStateResponse> 
messageSerializer =
-                               new MessageSerializer<>(
-                                               new 
KvStateRequest.KvStateRequestDeserializer(),
-                                               new 
KvStateResponse.KvStateResponseDeserializer());
-
-               this.client = new Client<>(
-                               "Queryable State Client",
-                               Hardware.getNumberCPUCores(),
-                               messageSerializer,
-                               new DisabledKvStateRequestStats());
-       }
-
-       /** Shuts down the client. */
-       public void shutdown() {
-               client.shutdown();
-       }
-
-       /**
-        * Gets the {@link ExecutionConfig}.
-        */
-       public ExecutionConfig getExecutionConfig() {
-               return executionConfig;
-       }
-
-       /**
-        * Replaces the existing {@link ExecutionConfig} (possibly {@code 
null}), with the provided one.
-        * @param config The new {@code configuration}.
-        * @return The old configuration, or {@code null} if none was specified.
-        * */
-       public ExecutionConfig setExecutionConfig(ExecutionConfig config) {
-               ExecutionConfig prev = executionConfig;
-               this.executionConfig = config;
-               return prev;
-       }
-
-       /**
-        * Returns a future holding the request result.  *
-        * @param jobId                     JobID of the job the queryable 
state belongs to.
-        * @param queryableStateName        Name under which the state is 
queryable.
-        * @param key                               The key we are interested 
in.
-        * @param keyTypeHint                           A {@link TypeHint} used 
to extract the type of the key.
-        * @param stateDescriptor                       The {@link 
StateDescriptor} of the state we want to query.
-        * @return Future holding the immutable {@link State} object containing 
the result.
-        */
-       @PublicEvolving
-       public <K, S extends State, V> CompletableFuture<S> getKvState(
-                       final JobID jobId,
-                       final String queryableStateName,
-                       final K key,
-                       final TypeHint<K> keyTypeHint,
-                       final StateDescriptor<S, V> stateDescriptor) {
-
-               Preconditions.checkNotNull(keyTypeHint);
-
-               TypeInformation<K> keyTypeInfo = keyTypeHint.getTypeInfo();
-               return getKvState(jobId, queryableStateName, key, keyTypeInfo, 
stateDescriptor);
-       }
-
-       /**
-        * Returns a future holding the request result.  *
-        * @param jobId                     JobID of the job the queryable 
state belongs to.
-        * @param queryableStateName        Name under which the state is 
queryable.
-        * @param key                               The key we are interested 
in.
-        * @param keyTypeInfo                           The {@link 
TypeInformation} of the key.
-        * @param stateDescriptor                       The {@link 
StateDescriptor} of the state we want to query.
-        * @return Future holding the immutable {@link State} object containing 
the result.
-        */
-       @PublicEvolving
-       public <K, S extends State, V> CompletableFuture<S> getKvState(
-                       final JobID jobId,
-                       final String queryableStateName,
-                       final K key,
-                       final TypeInformation<K> keyTypeInfo,
-                       final StateDescriptor<S, V> stateDescriptor) {
-
-               return getKvState(jobId, queryableStateName, key, 
VoidNamespace.INSTANCE,
-                               keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, 
stateDescriptor);
-       }
-
-       /**
-        * Returns a future holding the request result.
-        * @param jobId                     JobID of the job the queryable 
state belongs to.
-        * @param queryableStateName        Name under which the state is 
queryable.
-        * @param key                               The key that the state we 
request is associated with.
-        * @param namespace                                     The namespace 
of the state.
-        * @param keyTypeInfo                           The {@link 
TypeInformation} of the keys.
-        * @param namespaceTypeInfo                     The {@link 
TypeInformation} of the namespace.
-        * @param stateDescriptor                       The {@link 
StateDescriptor} of the state we want to query.
-        * @return Future holding the immutable {@link State} object containing 
the result.
-        */
-       @PublicEvolving
-       public <K, N, S extends State, V> CompletableFuture<S> getKvState(
-                       final JobID jobId,
-                       final String queryableStateName,
-                       final K key,
-                       final N namespace,
-                       final TypeInformation<K> keyTypeInfo,
-                       final TypeInformation<N> namespaceTypeInfo,
-                       final StateDescriptor<S, V> stateDescriptor) {
-
-               Preconditions.checkNotNull(jobId);
-               Preconditions.checkNotNull(queryableStateName);
-               Preconditions.checkNotNull(key);
-               Preconditions.checkNotNull(namespace);
-
-               Preconditions.checkNotNull(keyTypeInfo);
-               Preconditions.checkNotNull(namespaceTypeInfo);
-               Preconditions.checkNotNull(stateDescriptor);
-
-               TypeSerializer<K> keySerializer = 
keyTypeInfo.createSerializer(executionConfig);
-               TypeSerializer<N> namespaceSerializer = 
namespaceTypeInfo.createSerializer(executionConfig);
-
-               final byte[] serializedKeyAndNamespace;
-               try {
-                       serializedKeyAndNamespace = KvStateSerializer
-                                       .serializeKeyAndNamespace(key, 
keySerializer, namespace, namespaceSerializer);
-               } catch (IOException e) {
-                       return FutureUtils.getFailedFuture(e);
-               }
-
-               return getKvState(jobId, queryableStateName, key.hashCode(), 
serializedKeyAndNamespace).thenApply(
-                               stateResponse -> {
-                                       try {
-                                               return stateDescriptor.bind(new 
ImmutableStateBinder(stateResponse.getContent()));
-                                       } catch (Exception e) {
-                                               throw new 
FlinkRuntimeException(e);
-                                       }
-                               });
-       }
-
-       /**
-        * Returns a future holding the serialized request result.
-        *
-        * @param jobId                     JobID of the job the queryable state
-        *                                  belongs to
-        * @param queryableStateName        Name under which the state is 
queryable
-        * @param keyHashCode               Integer hash code of the key 
(result of
-        *                                  a call to {@link Object#hashCode()}
-        * @param serializedKeyAndNamespace Serialized key and namespace to 
query
-        *                                  KvState instance with
-        * @return Future holding the serialized result
-        */
-       private CompletableFuture<KvStateResponse> getKvState(
-                       final JobID jobId,
-                       final String queryableStateName,
-                       final int keyHashCode,
-                       final byte[] serializedKeyAndNamespace) {
-               LOG.info("Sending State Request to {}.", remoteAddress);
-               try {
-                       KvStateRequest request = new KvStateRequest(jobId, 
queryableStateName, keyHashCode, serializedKeyAndNamespace);
-                       return client.sendRequest(remoteAddress, request);
-               } catch (Exception e) {
-                       LOG.error("Unable to send KVStateRequest: ", e);
-                       return FutureUtils.getFailedFuture(e);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
deleted file mode 100644
index d7191b6..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.client.proxy;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.queryablestate.UnknownKvStateIdException;
-import org.apache.flink.queryablestate.UnknownKvStateKeyGroupLocationException;
-import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
-import org.apache.flink.queryablestate.messages.KvStateRequest;
-import org.apache.flink.queryablestate.messages.KvStateResponse;
-import org.apache.flink.queryablestate.network.AbstractServerHandler;
-import org.apache.flink.queryablestate.network.Client;
-import org.apache.flink.queryablestate.network.messages.MessageSerializer;
-import org.apache.flink.queryablestate.server.KvStateServerImpl;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.query.KvStateClientProxy;
-import org.apache.flink.runtime.query.KvStateID;
-import org.apache.flink.runtime.query.KvStateLocation;
-import org.apache.flink.runtime.query.KvStateMessage;
-import org.apache.flink.runtime.query.KvStateServerAddress;
-import org.apache.flink.runtime.query.UnknownKvStateLocation;
-import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
-import org.apache.flink.runtime.query.netty.KvStateRequestStats;
-import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.ConnectException;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-
-import scala.concurrent.duration.FiniteDuration;
-import scala.reflect.ClassTag$;
-
-/**
- * This handler acts as an internal (to the Flink cluster) client that receives
- * the requests from external clients, executes them by contacting the Job 
Manager (if necessary) and
- * the Task Manager holding the requested state, and forwards the answer back 
to the client.
- */
-@Internal
[email protected]
-public class KvStateClientProxyHandler extends 
AbstractServerHandler<KvStateRequest, KvStateResponse> {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(KvStateClientProxyHandler.class);
-
-       /** The proxy using this handler. */
-       private final KvStateClientProxy proxy;
-
-       /** A cache to hold the location of different states for which we have 
already seen requests. */
-       private final ConcurrentMap<Tuple2<JobID, String>, 
CompletableFuture<KvStateLocation>> lookupCache =
-                       new ConcurrentHashMap<>();
-
-       /**
-        * Network client to forward queries to {@link KvStateServerImpl state 
server}
-        * instances inside the cluster.
-        */
-       private final Client<KvStateInternalRequest, KvStateResponse> 
kvStateClient;
-
-       /**
-        * Create the handler used by the {@link KvStateClientProxyImpl}.
-        *
-        * @param proxy the {@link KvStateClientProxyImpl proxy} using the 
handler.
-        * @param queryExecutorThreads the number of threads used to process 
incoming requests.
-        * @param serializer the {@link MessageSerializer} used to (de-) 
serialize the different messages.
-        * @param stats server statistics collector.
-        */
-       public KvStateClientProxyHandler(
-                       final KvStateClientProxyImpl proxy,
-                       final int queryExecutorThreads,
-                       final MessageSerializer<KvStateRequest, 
KvStateResponse> serializer,
-                       final KvStateRequestStats stats) {
-
-               super(proxy, serializer, stats);
-               this.proxy = Preconditions.checkNotNull(proxy);
-               this.kvStateClient = createInternalClient(queryExecutorThreads);
-       }
-
-       private static Client<KvStateInternalRequest, KvStateResponse> 
createInternalClient(int threads) {
-               final MessageSerializer<KvStateInternalRequest, 
KvStateResponse> messageSerializer =
-                               new MessageSerializer<>(
-                                               new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(),
-                                               new 
KvStateResponse.KvStateResponseDeserializer());
-
-               return new Client<>(
-                               "Queryable State Proxy Client",
-                               threads,
-                               messageSerializer,
-                               new DisabledKvStateRequestStats());
-       }
-
-       @Override
-       public CompletableFuture<KvStateResponse> handleRequest(
-                       final long requestId,
-                       final KvStateRequest request) {
-               CompletableFuture<KvStateResponse> response = new 
CompletableFuture<>();
-               executeActionAsync(response, request, false);
-               return response;
-       }
-
-       private void executeActionAsync(
-                       final CompletableFuture<KvStateResponse> result,
-                       final KvStateRequest request,
-                       final boolean update) {
-
-               if (!result.isDone()) {
-                       final CompletableFuture<KvStateResponse> 
operationFuture = getState(request, update);
-                       operationFuture.whenCompleteAsync(
-                                       (t, throwable) -> {
-                                               if (throwable != null) {
-                                                       if (throwable 
instanceof CancellationException) {
-                                                               
result.completeExceptionally(throwable);
-                                                       } else if 
(throwable.getCause() instanceof UnknownKvStateIdException ||
-                                                                       
throwable.getCause() instanceof UnknownKvStateKeyGroupLocationException ||
-                                                                       
throwable.getCause() instanceof UnknownKvStateLocation ||
-                                                                       
throwable.getCause() instanceof ConnectException) {
-
-                                                               // These 
failures are likely to be caused by out-of-sync
-                                                               // 
KvStateLocation. Therefore we retry this query and
-                                                               // force look 
up the location.
-
-                                                               
executeActionAsync(result, request, true);
-                                                       } else {
-                                                               
result.completeExceptionally(throwable);
-                                                       }
-                                               } else {
-                                                       result.complete(t);
-                                               }
-                                       }, queryExecutor);
-
-                       result.whenComplete(
-                                       (t, throwable) -> 
operationFuture.cancel(false));
-               }
-       }
-
-       private CompletableFuture<KvStateResponse> getState(
-                       final KvStateRequest request,
-                       final boolean forceUpdate) {
-
-               return getKvStateLookupInfo(request.getJobId(), 
request.getStateName(), forceUpdate)
-                               .thenComposeAsync((Function<KvStateLocation, 
CompletableFuture<KvStateResponse>>) location -> {
-                                       final int keyGroupIndex = 
KeyGroupRangeAssignment.computeKeyGroupForKeyHash(
-                                                       
request.getKeyHashCode(), location.getNumKeyGroups());
-
-                                       final KvStateServerAddress 
serverAddress = location.getKvStateServerAddress(keyGroupIndex);
-                                       if (serverAddress == null) {
-                                               return 
FutureUtils.getFailedFuture(new 
UnknownKvStateKeyGroupLocationException(getServerName()));
-                                       } else {
-                                               // Query server
-                                               final KvStateID kvStateId = 
location.getKvStateID(keyGroupIndex);
-                                               final KvStateInternalRequest 
internalRequest = new KvStateInternalRequest(
-                                                               kvStateId, 
request.getSerializedKeyAndNamespace());
-                                               return 
kvStateClient.sendRequest(serverAddress, internalRequest);
-                                       }
-                               }, queryExecutor);
-       }
-
-       /**
-        * Lookup the {@link KvStateLocation} for the given job and queryable 
state name.
-        *
-        * <p>The job manager will be queried for the location only if forced 
or no
-        * cached location can be found. There are no guarantees about
-        *
-        * @param jobId              JobID the state instance belongs to.
-        * @param queryableStateName Name under which the state instance has 
been published.
-        * @param forceUpdate        Flag to indicate whether to force a update 
via the lookup service.
-        * @return Future holding the KvStateLocation
-        */
-       private CompletableFuture<KvStateLocation> getKvStateLookupInfo(
-                       final JobID jobId,
-                       final String queryableStateName,
-                       final boolean forceUpdate) {
-
-               final Tuple2<JobID, String> cacheKey = new Tuple2<>(jobId, 
queryableStateName);
-               final CompletableFuture<KvStateLocation> cachedFuture = 
lookupCache.get(cacheKey);
-
-               if (!forceUpdate && cachedFuture != null && 
!cachedFuture.isCompletedExceptionally()) {
-                       LOG.debug("Retrieving location for state={} of job={} 
from the cache.", jobId, queryableStateName);
-                       return cachedFuture;
-               }
-
-               LOG.debug("Retrieving location for state={} of job={} from the 
job manager.", jobId, queryableStateName);
-
-               return proxy.getJobManagerFuture().thenComposeAsync(
-                               jobManagerGateway -> {
-                                       final Object msg = new 
KvStateMessage.LookupKvStateLocation(jobId, queryableStateName);
-                                       final 
CompletableFuture<KvStateLocation> locationFuture = FutureUtils.toJava(
-                                                       
jobManagerGateway.ask(msg, FiniteDuration.apply(1000L, TimeUnit.MILLISECONDS))
-                                                                       
.mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class)));
-
-                                       lookupCache.put(cacheKey, 
locationFuture);
-                                       return locationFuture;
-                               }, queryExecutor);
-       }
-
-       @Override
-       public void shutdown() {
-               kvStateClient.shutdown();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
deleted file mode 100644
index 196641d..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.client.proxy;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.queryablestate.UnknownJobManagerException;
-import org.apache.flink.queryablestate.messages.KvStateRequest;
-import org.apache.flink.queryablestate.messages.KvStateResponse;
-import org.apache.flink.queryablestate.network.AbstractServerBase;
-import org.apache.flink.queryablestate.network.AbstractServerHandler;
-import org.apache.flink.queryablestate.network.messages.MessageSerializer;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.query.KvStateClientProxy;
-import org.apache.flink.runtime.query.KvStateServerAddress;
-import org.apache.flink.runtime.query.netty.KvStateRequestStats;
-import org.apache.flink.util.Preconditions;
-
-import java.net.InetAddress;
-import java.util.Iterator;
-import java.util.concurrent.CompletableFuture;
-
-/**
- * The default implementation of the {@link KvStateClientProxy}.
- */
-@Internal
-public class KvStateClientProxyImpl extends AbstractServerBase<KvStateRequest, 
KvStateResponse> implements KvStateClientProxy {
-
-       private static final CompletableFuture<ActorGateway> 
UNKNOWN_JOB_MANAGER =
-                       FutureUtils.getFailedFuture(new 
UnknownJobManagerException());
-
-       /** Number of threads used to process incoming requests. */
-       private final int queryExecutorThreads;
-
-       /** Statistics collector. */
-       private final KvStateRequestStats stats;
-
-       private final Object leaderLock = new Object();
-
-       private CompletableFuture<ActorGateway> jobManagerFuture = 
UNKNOWN_JOB_MANAGER;
-
-       /**
-        * Creates the Queryable State Client Proxy.
-        *
-        * <p>The server is instantiated using reflection by the
-        * {@link 
org.apache.flink.runtime.query.QueryableStateUtils#createKvStateClientProxy(InetAddress,
 Iterator, int, int, KvStateRequestStats)
-        * QueryableStateUtils.createKvStateClientProxy(InetAddress, Iterator, 
int, int, KvStateRequestStats)}.
-        *
-        * <p>The server needs to be started via {@link #start()} in order to 
bind
-        * to the configured bind address.
-        *
-        * @param bindAddress the address to listen to.
-        * @param bindPortIterator the port range to try to bind to.
-        * @param numEventLoopThreads number of event loop threads.
-        * @param numQueryThreads number of query threads.
-        * @param stats the statistics collector.
-        */
-       public KvStateClientProxyImpl(
-                       final InetAddress bindAddress,
-                       final Iterator<Integer> bindPortIterator,
-                       final Integer numEventLoopThreads,
-                       final Integer numQueryThreads,
-                       final KvStateRequestStats stats) {
-
-               super("Queryable State Proxy Server", bindAddress, 
bindPortIterator, numEventLoopThreads, numQueryThreads);
-               Preconditions.checkArgument(numQueryThreads >= 1, "Non-positive 
number of query threads.");
-               this.queryExecutorThreads = numQueryThreads;
-               this.stats = Preconditions.checkNotNull(stats);
-       }
-
-       @Override
-       public KvStateServerAddress getServerAddress() {
-               return super.getServerAddress();
-       }
-
-       @Override
-       public void start() throws Throwable {
-               super.start();
-       }
-
-       @Override
-       public void shutdown() {
-               super.shutdown();
-       }
-
-       @Override
-       public void updateJobManager(CompletableFuture<ActorGateway> 
leadingJobManager) throws Exception {
-               synchronized (leaderLock) {
-                       if (leadingJobManager == null) {
-                               jobManagerFuture = UNKNOWN_JOB_MANAGER;
-                       } else {
-                               jobManagerFuture = leadingJobManager;
-                       }
-               }
-       }
-
-       @Override
-       public CompletableFuture<ActorGateway> getJobManagerFuture() {
-               synchronized (leaderLock) {
-                       return jobManagerFuture;
-               }
-       }
-
-       @Override
-       public AbstractServerHandler<KvStateRequest, KvStateResponse> 
initializeHandler() {
-               MessageSerializer<KvStateRequest, KvStateResponse> serializer =
-                               new MessageSerializer<>(
-                                               new 
KvStateRequest.KvStateRequestDeserializer(),
-                                               new 
KvStateResponse.KvStateResponseDeserializer());
-               return new KvStateClientProxyHandler(this, 
queryExecutorThreads, serializer, stats);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java
deleted file mode 100644
index b853cfc..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.client.state;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.state.AggregatingState;
-import org.apache.flink.api.common.state.AggregatingStateDescriptor;
-import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-
-/**
- * A read-only {@link AggregatingState} that <b>does not</b> allow for 
modifications.
- *
- * <p>This is the type of the result returned when querying Flink's keyed 
state using the
- * {@link org.apache.flink.queryablestate.client.QueryableStateClient 
Queryable State Client} and
- * providing an {@link AggregatingStateDescriptor}.
- */
-@PublicEvolving
-public final class ImmutableAggregatingState<IN, OUT> extends ImmutableState 
implements AggregatingState<IN, OUT> {
-
-       private final OUT value;
-
-       private ImmutableAggregatingState(OUT value) {
-               this.value = Preconditions.checkNotNull(value);
-       }
-
-       @Override
-       public OUT get() {
-               return value;
-       }
-
-       @Override
-       public void add(Object newValue) {
-               throw MODIFICATION_ATTEMPT_ERROR;
-       }
-
-       @Override
-       public void clear() {
-               throw MODIFICATION_ATTEMPT_ERROR;
-       }
-
-       public static <IN, ACC, OUT> ImmutableAggregatingState<IN, OUT> 
createState(
-                       final AggregatingStateDescriptor<IN, ACC, OUT> 
stateDescriptor,
-                       final byte[] serializedValue) throws IOException {
-
-               final ACC accumulator = KvStateSerializer.deserializeValue(
-                               serializedValue,
-                               stateDescriptor.getSerializer());
-
-               final OUT state = 
stateDescriptor.getAggregateFunction().getResult(accumulator);
-               return new ImmutableAggregatingState<>(state);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java
deleted file mode 100644
index a12adaa..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.client.state;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.state.FoldingState;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-
-/**
- * A read-only {@link FoldingState} that does not allow for modifications.
- *
- * <p>This is the result returned when querying Flink's keyed state using the
- * {@link org.apache.flink.queryablestate.client.QueryableStateClient 
Queryable State Client} and
- * providing an {@link FoldingStateDescriptor}.
- */
-@PublicEvolving
-@Deprecated
-public final class ImmutableFoldingState<IN, ACC> extends ImmutableState 
implements FoldingState<IN, ACC> {
-
-       private final ACC value;
-
-       private ImmutableFoldingState(ACC value) {
-               this.value = Preconditions.checkNotNull(value);
-       }
-
-       @Override
-       public ACC get() {
-               return value;
-       }
-
-       @Override
-       public void add(Object newValue) {
-               throw MODIFICATION_ATTEMPT_ERROR;
-       }
-
-       @Override
-       public void clear() {
-               throw MODIFICATION_ATTEMPT_ERROR;
-       }
-
-       public static <IN, ACC> ImmutableFoldingState<IN, ACC> createState(
-                       final FoldingStateDescriptor<IN, ACC> stateDescriptor,
-                       final byte[] serializedState) throws IOException {
-
-               final ACC state = KvStateSerializer.deserializeValue(
-                               serializedState,
-                               stateDescriptor.getSerializer());
-               return new ImmutableFoldingState<>(state);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java
deleted file mode 100644
index 8416905..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.client.state;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * A read-only {@link ListState} that does not allow for modifications.
- *
- * <p>This is the result returned when querying Flink's keyed state using the
- * {@link org.apache.flink.queryablestate.client.QueryableStateClient 
Queryable State Client} and
- * providing an {@link ListStateDescriptor}.
- */
-@PublicEvolving
-public final class ImmutableListState<V> extends ImmutableState implements 
ListState<V> {
-
-       private final List<V> listState;
-
-       private ImmutableListState(final List<V> state) {
-               this.listState = Preconditions.checkNotNull(state);
-       }
-
-       @Override
-       public Iterable<V> get() {
-               return listState;
-       }
-
-       @Override
-       public void add(V value) {
-               throw MODIFICATION_ATTEMPT_ERROR;
-       }
-
-       @Override
-       public void clear() {
-               throw MODIFICATION_ATTEMPT_ERROR;
-       }
-
-       public static <V> ImmutableListState<V> createState(
-                       final ListStateDescriptor<V> stateDescriptor,
-                       final byte[] serializedState) throws IOException {
-
-               final List<V> state = KvStateSerializer.deserializeList(
-                               serializedState,
-                               stateDescriptor.getElementSerializer());
-               return new ImmutableListState<>(state);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java
deleted file mode 100644
index c216d5d..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.client.state;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.state.MapState;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * A read-only {@link MapState} that does not allow for modifications.
- *
- * <p>This is the result returned when querying Flink's keyed state using the
- * {@link org.apache.flink.queryablestate.client.QueryableStateClient 
Queryable State Client} and
- * providing an {@link MapStateDescriptor}.
- */
-@PublicEvolving
-public final class ImmutableMapState<K, V> extends ImmutableState implements 
MapState<K, V> {
-
-       private final Map<K, V> state;
-
-       private ImmutableMapState(final Map<K, V> mapState) {
-               this.state = Preconditions.checkNotNull(mapState);
-       }
-
-       @Override
-       public V get(K key) {
-               return state.get(key);
-       }
-
-       @Override
-       public void put(K key, V value) {
-               throw MODIFICATION_ATTEMPT_ERROR;
-       }
-
-       @Override
-       public void putAll(Map<K, V> map) {
-               throw MODIFICATION_ATTEMPT_ERROR;
-       }
-
-       @Override
-       public void remove(K key) {
-               throw MODIFICATION_ATTEMPT_ERROR;
-       }
-
-       @Override
-       public boolean contains(K key) {
-               return state.containsKey(key);
-       }
-
-       /**
-        * Returns all the mappings in the state in a {@link 
Collections#unmodifiableSet(Set)}.
-        *
-        * @return A read-only iterable view of all the key-value pairs in the 
state.
-        *
-        * @throws Exception Thrown if the system cannot access the state.
-        */
-       @Override
-       public Iterable<Map.Entry<K, V>> entries() {
-               return Collections.unmodifiableSet(state.entrySet());
-       }
-
-       /**
-        * Returns all the keys in the state in a {@link 
Collections#unmodifiableSet(Set)}.
-        *
-        * @return A read-only iterable view of all the keys in the state.
-        *
-        * @throws Exception Thrown if the system cannot access the state.
-        */
-       @Override
-       public Iterable<K> keys() {
-               return Collections.unmodifiableSet(state.keySet());
-       }
-
-       /**
-        * Returns all the values in the state in a {@link 
Collections#unmodifiableCollection(Collection)}.
-        *
-        * @return A read-only iterable view of all the values in the state.
-        *
-        * @throws Exception Thrown if the system cannot access the state.
-        */
-       @Override
-       public Iterable<V> values() {
-               return Collections.unmodifiableCollection(state.values());
-       }
-
-       /**
-        * Iterates over all the mappings in the state. The iterator cannot
-        * remove elements.
-        *
-        * @return A read-only iterator over all the mappings in the state
-        *
-        * @throws Exception Thrown if the system cannot access the state.
-        */
-       @Override
-       public Iterator<Map.Entry<K, V>> iterator() {
-               return Collections.unmodifiableSet(state.entrySet()).iterator();
-       }
-
-       @Override
-       public void clear() {
-               throw MODIFICATION_ATTEMPT_ERROR;
-       }
-
-       public static <K, V> ImmutableMapState<K, V> createState(
-                       final MapStateDescriptor<K, V> stateDescriptor,
-                       final byte[] serializedState) throws IOException {
-
-               final Map<K, V> state = KvStateSerializer.deserializeMap(
-                               serializedState,
-                               stateDescriptor.getKeySerializer(),
-                               stateDescriptor.getValueSerializer());
-               return new ImmutableMapState<>(state);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java
deleted file mode 100644
index da08c53..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.client.state;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.state.ReducingState;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-
-/**
- * A read-only {@link ReducingState} that does not allow for modifications.
- *
- * <p>This is the result returned when querying Flink's keyed state using the
- * {@link org.apache.flink.queryablestate.client.QueryableStateClient 
Queryable State Client} and
- * providing an {@link ReducingStateDescriptor}.
- */
-@PublicEvolving
-public final class ImmutableReducingState<V> extends ImmutableState implements 
ReducingState<V> {
-
-       private final V value;
-
-       private ImmutableReducingState(V value) {
-               this.value = Preconditions.checkNotNull(value);
-       }
-
-       @Override
-       public V get() {
-               return value;
-       }
-
-       @Override
-       public void add(V newValue) {
-               throw MODIFICATION_ATTEMPT_ERROR;
-       }
-
-       @Override
-       public void clear() {
-               throw MODIFICATION_ATTEMPT_ERROR;
-       }
-
-       public static <V> ImmutableReducingState<V> createState(
-                       final ReducingStateDescriptor<V> stateDescriptor,
-                       final byte[] serializedState) throws IOException {
-
-               final V state = KvStateSerializer.deserializeValue(
-                               serializedState,
-                               stateDescriptor.getSerializer());
-               return new ImmutableReducingState<>(state);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableState.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableState.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableState.java
deleted file mode 100644
index 863f07b..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableState.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.client.state;
-
-/**
- * A base class for the <b>read-only</b> types of state returned
- * as results from the Queryable State Client.
- */
-abstract class ImmutableState {
-
-       protected static final UnsupportedOperationException 
MODIFICATION_ATTEMPT_ERROR =
-                       new UnsupportedOperationException("State is read-only. 
No modifications allowed.");
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableStateBinder.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableStateBinder.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableStateBinder.java
deleted file mode 100644
index 6ce2787..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableStateBinder.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.client.state;
-
-import org.apache.flink.api.common.state.AggregatingState;
-import org.apache.flink.api.common.state.AggregatingStateDescriptor;
-import org.apache.flink.api.common.state.FoldingState;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.MapState;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.state.ReducingState;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.api.common.state.StateBinder;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.util.Preconditions;
-
-/**
- * A {@link StateBinder} used to deserialize the results returned by the
- * {@link org.apache.flink.queryablestate.client.QueryableStateClient}.
- *
- * <p>The result is an immutable {@link 
org.apache.flink.api.common.state.State State}
- * object containing the requested result.
- */
-public class ImmutableStateBinder implements StateBinder {
-
-       private final byte[] serializedState;
-
-       public ImmutableStateBinder(final byte[] content) {
-               serializedState = Preconditions.checkNotNull(content);
-       }
-
-       @Override
-       public <T> ValueState<T> createValueState(ValueStateDescriptor<T> 
stateDesc) throws Exception {
-               return ImmutableValueState.createState(stateDesc, 
serializedState);
-       }
-
-       @Override
-       public <T> ListState<T> createListState(ListStateDescriptor<T> 
stateDesc) throws Exception {
-               return ImmutableListState.createState(stateDesc, 
serializedState);
-       }
-
-       @Override
-       public <T> ReducingState<T> 
createReducingState(ReducingStateDescriptor<T> stateDesc) throws Exception {
-               return ImmutableReducingState.createState(stateDesc, 
serializedState);
-       }
-
-       @Override
-       public <IN, ACC, OUT> AggregatingState<IN, OUT> 
createAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateDesc) 
throws Exception {
-               return ImmutableAggregatingState.createState(stateDesc, 
serializedState);
-       }
-
-       @Override
-       public <T, ACC> FoldingState<T, ACC> 
createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
-               return ImmutableFoldingState.createState(stateDesc, 
serializedState);
-       }
-
-       @Override
-       public <MK, MV> MapState<MK, MV> createMapState(MapStateDescriptor<MK, 
MV> stateDesc) throws Exception {
-               return ImmutableMapState.createState(stateDesc, 
serializedState);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java
deleted file mode 100644
index 7fd6457..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.client.state;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-
-/**
- * A read-only {@link ValueState} that does not allow for modifications.
- *
- * <p>This is the result returned when querying Flink's keyed state using the
- * {@link org.apache.flink.queryablestate.client.QueryableStateClient 
Queryable State Client} and
- * providing an {@link ValueStateDescriptor}.
- */
-@PublicEvolving
-public final class ImmutableValueState<V> extends ImmutableState implements 
ValueState<V> {
-
-       private final V value;
-
-       private ImmutableValueState(V value) {
-               this.value = Preconditions.checkNotNull(value);
-       }
-
-       @Override
-       public V value() {
-               return value;
-       }
-
-       @Override
-       public void update(V newValue) {
-               throw MODIFICATION_ATTEMPT_ERROR;
-       }
-
-       @Override
-       public void clear() {
-               throw MODIFICATION_ATTEMPT_ERROR;
-       }
-
-       public static <V> ImmutableValueState<V> createState(
-                       final ValueStateDescriptor<V> stateDescriptor,
-                       final byte[] serializedState) throws IOException {
-
-               final V state = KvStateSerializer.deserializeValue(
-                               serializedState,
-                               stateDescriptor.getSerializer());
-               return new ImmutableValueState<>(state);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateInternalRequest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateInternalRequest.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateInternalRequest.java
deleted file mode 100644
index eedc2a1..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateInternalRequest.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.messages;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.queryablestate.network.messages.MessageBody;
-import org.apache.flink.queryablestate.network.messages.MessageDeserializer;
-import org.apache.flink.runtime.query.KvStateID;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-
-import java.nio.ByteBuffer;
-
-/**
- * The request to be forwarded by the {@link 
org.apache.flink.runtime.query.KvStateClientProxy
- * Queryable State Client Proxy} to the {@link 
org.apache.flink.runtime.query.KvStateServer State Server}
- * of the Task Manager responsible for the requested state.
- */
-@Internal
-public class KvStateInternalRequest extends MessageBody {
-
-       private final KvStateID kvStateId;
-       private final byte[] serializedKeyAndNamespace;
-
-       public KvStateInternalRequest(
-                       final KvStateID stateId,
-                       final byte[] serializedKeyAndNamespace) {
-
-               this.kvStateId = Preconditions.checkNotNull(stateId);
-               this.serializedKeyAndNamespace = 
Preconditions.checkNotNull(serializedKeyAndNamespace);
-       }
-
-       public KvStateID getKvStateId() {
-               return kvStateId;
-       }
-
-       public byte[] getSerializedKeyAndNamespace() {
-               return serializedKeyAndNamespace;
-       }
-
-       @Override
-       public byte[] serialize() {
-
-               // KvStateId + sizeOf(serializedKeyAndNamespace) + 
serializedKeyAndNamespace
-               final int size = KvStateID.SIZE + Integer.BYTES + 
serializedKeyAndNamespace.length;
-
-               return ByteBuffer.allocate(size)
-                               .putLong(kvStateId.getLowerPart())
-                               .putLong(kvStateId.getUpperPart())
-                               .putInt(serializedKeyAndNamespace.length)
-                               .put(serializedKeyAndNamespace)
-                               .array();
-       }
-
-       /**
-        * A {@link MessageDeserializer deserializer} for {@link 
KvStateInternalRequest}.
-        */
-       public static class KvStateInternalRequestDeserializer implements 
MessageDeserializer<KvStateInternalRequest> {
-
-               @Override
-               public KvStateInternalRequest deserializeMessage(ByteBuf buf) {
-                       KvStateID kvStateId = new KvStateID(buf.readLong(), 
buf.readLong());
-
-                       int length = buf.readInt();
-                       Preconditions.checkArgument(length >= 0,
-                                       "Negative length for key and namespace. 
" +
-                                                       "This indicates a 
serialization error.");
-
-                       byte[] serializedKeyAndNamespace = new byte[length];
-                       if (length > 0) {
-                               buf.readBytes(serializedKeyAndNamespace);
-                       }
-                       return new KvStateInternalRequest(kvStateId, 
serializedKeyAndNamespace);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequest.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequest.java
deleted file mode 100644
index 7eb39c7..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequest.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.messages;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.queryablestate.network.messages.MessageBody;
-import org.apache.flink.queryablestate.network.messages.MessageDeserializer;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-
-/**
- * The request to be sent by the {@link 
org.apache.flink.queryablestate.client.QueryableStateClient
- * Queryable State Client} to the {@link 
org.apache.flink.runtime.query.KvStateClientProxy Client Proxy}
- * requesting a given state.
- */
-@Internal
-public class KvStateRequest extends MessageBody {
-
-       private final JobID jobId;
-       private final String stateName;
-       private final int keyHashCode;
-       private final byte[] serializedKeyAndNamespace;
-
-       public KvStateRequest(
-                       final JobID jobId,
-                       final String stateName,
-                       final int keyHashCode,
-                       final byte[] serializedKeyAndNamespace) {
-
-               this.jobId = Preconditions.checkNotNull(jobId);
-               this.stateName = Preconditions.checkNotNull(stateName);
-               this.keyHashCode = keyHashCode;
-               this.serializedKeyAndNamespace = 
Preconditions.checkNotNull(serializedKeyAndNamespace);
-       }
-
-       public JobID getJobId() {
-               return jobId;
-       }
-
-       public String getStateName() {
-               return stateName;
-       }
-
-       public int getKeyHashCode() {
-               return keyHashCode;
-       }
-
-       public byte[] getSerializedKeyAndNamespace() {
-               return serializedKeyAndNamespace;
-       }
-
-       @Override
-       public byte[] serialize() {
-
-               byte[] serializedStateName = stateName.getBytes();
-
-               // JobID + stateName + sizeOf(stateName) + hashCode + 
keyAndNamespace + sizeOf(keyAndNamespace)
-               final int size =
-                               JobID.SIZE +
-                               serializedStateName.length + Integer.BYTES +
-                               Integer.BYTES +
-                               serializedKeyAndNamespace.length + 
Integer.BYTES;
-
-               return ByteBuffer.allocate(size)
-                               .putLong(jobId.getLowerPart())
-                               .putLong(jobId.getUpperPart())
-                               .putInt(serializedStateName.length)
-                               .put(serializedStateName)
-                               .putInt(keyHashCode)
-                               .putInt(serializedKeyAndNamespace.length)
-                               .put(serializedKeyAndNamespace)
-                               .array();
-       }
-
-       @Override
-       public String toString() {
-               return "KvStateRequest{" +
-                               "jobId=" + jobId +
-                               ", stateName='" + stateName + '\'' +
-                               ", keyHashCode=" + keyHashCode +
-                               ", serializedKeyAndNamespace=" + 
Arrays.toString(serializedKeyAndNamespace) +
-                               '}';
-       }
-
-       /**
-        * A {@link MessageDeserializer deserializer} for {@link 
KvStateRequest}.
-        */
-       public static class KvStateRequestDeserializer implements 
MessageDeserializer<KvStateRequest> {
-
-               @Override
-               public KvStateRequest deserializeMessage(ByteBuf buf) {
-                       JobID jobId = new JobID(buf.readLong(), buf.readLong());
-
-                       int statenameLength = buf.readInt();
-                       Preconditions.checkArgument(statenameLength >= 0,
-                                       "Negative length for state name. " +
-                                                       "This indicates a 
serialization error.");
-
-                       String stateName = "";
-                       if (statenameLength > 0) {
-                               byte[] name = new byte[statenameLength];
-                               buf.readBytes(name);
-                               stateName = new String(name);
-                       }
-
-                       int keyHashCode = buf.readInt();
-
-                       int knamespaceLength = buf.readInt();
-                       Preconditions.checkArgument(knamespaceLength >= 0,
-                                       "Negative length for key and namespace. 
" +
-                                                       "This indicates a 
serialization error.");
-
-                       byte[] serializedKeyAndNamespace = new 
byte[knamespaceLength];
-                       if (knamespaceLength > 0) {
-                               buf.readBytes(serializedKeyAndNamespace);
-                       }
-                       return new KvStateRequest(jobId, stateName, 
keyHashCode, serializedKeyAndNamespace);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateResponse.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateResponse.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateResponse.java
deleted file mode 100644
index 462135f..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateResponse.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.messages;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.queryablestate.network.messages.MessageBody;
-import org.apache.flink.queryablestate.network.messages.MessageDeserializer;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-
-import java.nio.ByteBuffer;
-
-/**
- * The response containing the (serialized) state sent by the {@link 
org.apache.flink.runtime.query.KvStateServer
- * State Server} to the {@link 
org.apache.flink.runtime.query.KvStateClientProxy Client Proxy}, and then 
forwarded
- * by the proxy to the original {@link 
org.apache.flink.queryablestate.client.QueryableStateClient Queryable State
- * Client}.
- */
-@Internal
-public class KvStateResponse extends MessageBody {
-
-       private final byte[] content;
-
-       public KvStateResponse(final byte[] content) {
-               this.content = Preconditions.checkNotNull(content);
-       }
-
-       public byte[] getContent() {
-               return content;
-       }
-
-       @Override
-       public byte[] serialize() {
-               final int size = Integer.BYTES + content.length;
-               return ByteBuffer.allocate(size)
-                               .putInt(content.length)
-                               .put(content)
-                               .array();
-       }
-
-       /**
-        * A {@link MessageDeserializer deserializer} for {@link 
KvStateResponseDeserializer}.
-        */
-       public static class KvStateResponseDeserializer implements 
MessageDeserializer<KvStateResponse> {
-
-               @Override
-               public KvStateResponse deserializeMessage(ByteBuf buf) {
-                       int length = buf.readInt();
-                       Preconditions.checkArgument(length >= 0,
-                                       "Negative length for state content. " +
-                                                       "This indicates a 
serialization error.");
-                       byte[] content = new byte[length];
-                       buf.readBytes(content);
-
-                       return new KvStateResponse(content);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
deleted file mode 100644
index be852fb..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
+++ /dev/null
@@ -1,310 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.network;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.queryablestate.network.messages.MessageBody;
-import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
-import org.apache.flink.runtime.query.KvStateServerAddress;
-import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.flink.util.Preconditions;
-
-import 
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption;
-import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
-import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
-import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
-import 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
-import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-import 
org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.BindException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
-/**
- * The base class for every server in the queryable state module.
- * It is using pure netty to send and receive messages of type {@link 
MessageBody}.
- *
- * @param <REQ> the type of request the server expects to receive.
- * @param <RESP> the type of response the server will send.
- */
-@Internal
-public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends 
MessageBody> {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(AbstractServerBase.class);
-
-       /** AbstractServerBase config: low water mark. */
-       private static final int LOW_WATER_MARK = 8 * 1024;
-
-       /** AbstractServerBase config: high water mark. */
-       private static final int HIGH_WATER_MARK = 32 * 1024;
-
-       /** The name of the server, useful for debugging. */
-       private final String serverName;
-
-       /** The {@link InetAddress address} to listen to. */
-       private final InetAddress bindAddress;
-
-       /** A port range on which to try to connect. */
-       private final Set<Integer> bindPortRange;
-
-       /** The number of threads to be allocated to the event loop. */
-       private final int numEventLoopThreads;
-
-       /** The number of threads to be used for query serving. */
-       private final int numQueryThreads;
-
-       /** Netty's ServerBootstrap. */
-       private ServerBootstrap bootstrap;
-
-       /** Query executor thread pool. */
-       private ExecutorService queryExecutor;
-
-       /** Address of this server. */
-       private KvStateServerAddress serverAddress;
-
-       /** The handler used for the incoming messages. */
-       private AbstractServerHandler<REQ, RESP> handler;
-
-       /**
-        * Creates the {@link AbstractServerBase}.
-        *
-        * <p>The server needs to be started via {@link #start()}.
-        *
-        * @param serverName the name of the server
-        * @param bindAddress address to bind to
-        * @param bindPortIterator port to bind to
-        * @param numEventLoopThreads number of event loop threads
-        */
-       protected AbstractServerBase(
-                       final String serverName,
-                       final InetAddress bindAddress,
-                       final Iterator<Integer> bindPortIterator,
-                       final Integer numEventLoopThreads,
-                       final Integer numQueryThreads) {
-
-               Preconditions.checkNotNull(bindPortIterator);
-               Preconditions.checkArgument(numEventLoopThreads >= 1, 
"Non-positive number of event loop threads.");
-               Preconditions.checkArgument(numQueryThreads >= 1, "Non-positive 
number of query threads.");
-
-               this.serverName = Preconditions.checkNotNull(serverName);
-               this.bindAddress = Preconditions.checkNotNull(bindAddress);
-               this.numEventLoopThreads = numEventLoopThreads;
-               this.numQueryThreads = numQueryThreads;
-
-               this.bindPortRange = new HashSet<>();
-               while (bindPortIterator.hasNext()) {
-                       int port = bindPortIterator.next();
-                       Preconditions.checkArgument(port >= 0 && port <= 65535,
-                                       "Invalid port configuration. Port must 
be between 0 and 65535, but was " + port + ".");
-                       bindPortRange.add(port);
-               }
-       }
-
-       /**
-        * Creates a thread pool for the query execution.
-        * @return Thread pool for query execution
-        */
-       private ExecutorService createQueryExecutor() {
-               ThreadFactory threadFactory = new ThreadFactoryBuilder()
-                               .setDaemon(true)
-                               .setNameFormat("Flink " + getServerName() + " 
Thread %d")
-                               .build();
-               return Executors.newFixedThreadPool(numQueryThreads, 
threadFactory);
-       }
-
-       /**
-        * Returns the thread-pool responsible for processing incoming requests.
-        */
-       protected ExecutorService getQueryExecutor() {
-               return queryExecutor;
-       }
-
-       /**
-        * Gets the name of the server. This is useful for debugging.
-        * @return The name of the server.
-        */
-       public String getServerName() {
-               return serverName;
-       }
-
-       /**
-        * Returns the {@link AbstractServerHandler handler} to be used for
-        * serving the incoming requests.
-        */
-       public abstract AbstractServerHandler<REQ, RESP> initializeHandler();
-
-       /**
-        * Returns the address of this server.
-        *
-        * @return AbstractServerBase address
-        * @throws IllegalStateException If server has not been started yet
-        */
-       public KvStateServerAddress getServerAddress() {
-               Preconditions.checkState(serverAddress != null, "Server " + 
serverName + " has not been started.");
-               return serverAddress;
-       }
-
-       /**
-        * Starts the server by binding to the configured bind address 
(blocking).
-        * @throws Exception If something goes wrong during the bind operation.
-        */
-       public void start() throws Throwable {
-               Preconditions.checkState(serverAddress == null,
-                               "Server " + serverName + " already running @ " 
+ serverAddress + '.');
-
-               Iterator<Integer> portIterator = bindPortRange.iterator();
-               while (portIterator.hasNext() && 
!attemptToBind(portIterator.next())) {}
-
-               if (serverAddress != null) {
-                       LOG.info("Started server {} @ {}.", serverName, 
serverAddress);
-               } else {
-                       LOG.info("Unable to start server {}. All ports in 
provided range are occupied.", serverName);
-                       throw new FlinkRuntimeException("Unable to start server 
" + serverName + ". All ports in provided range are occupied.");
-               }
-       }
-
-       /**
-        * Tries to start the server at the provided port.
-        *
-        * <p>This, in conjunction with {@link #start()}, try to start the
-        * server on a free port among the port range provided at the 
constructor.
-        *
-        * @param port the port to try to bind the server to.
-        * @throws Exception If something goes wrong during the bind operation.
-        */
-       private boolean attemptToBind(final int port) throws Throwable {
-               LOG.debug("Attempting to start server {} on port {}.", 
serverName, port);
-
-               this.queryExecutor = createQueryExecutor();
-               this.handler = initializeHandler();
-
-               final NettyBufferPool bufferPool = new 
NettyBufferPool(numEventLoopThreads);
-
-               final ThreadFactory threadFactory = new ThreadFactoryBuilder()
-                               .setDaemon(true)
-                               .setNameFormat("Flink " + serverName + " 
EventLoop Thread %d")
-                               .build();
-
-               final NioEventLoopGroup nioGroup = new 
NioEventLoopGroup(numEventLoopThreads, threadFactory);
-
-               this.bootstrap = new ServerBootstrap()
-                               .localAddress(bindAddress, port)
-                               .group(nioGroup)
-                               .channel(NioServerSocketChannel.class)
-                               .option(ChannelOption.ALLOCATOR, bufferPool)
-                               .childOption(ChannelOption.ALLOCATOR, 
bufferPool)
-                               
.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, HIGH_WATER_MARK)
-                               
.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, LOW_WATER_MARK)
-                               .childHandler(new 
ServerChannelInitializer<>(handler));
-
-               try {
-                       final ChannelFuture future = bootstrap.bind().sync();
-                       if (future.isSuccess()) {
-                               final InetSocketAddress localAddress = 
(InetSocketAddress) future.channel().localAddress();
-                               serverAddress = new 
KvStateServerAddress(localAddress.getAddress(), localAddress.getPort());
-                               return true;
-                       }
-
-                       // the following throw is to bypass Netty's 
"optimization magic"
-                       // and catch the bind exception.
-                       // the exception is thrown by the sync() call above.
-
-                       throw future.cause();
-               } catch (BindException e) {
-                       LOG.debug("Failed to start server {} on port {}: {}.", 
serverName, port, e.getMessage());
-                       shutdown();
-               }
-               // any other type of exception we let it bubble up.
-               return false;
-       }
-
-       /**
-        * Shuts down the server and all related thread pools.
-        */
-       public void shutdown() {
-               LOG.info("Shutting down server {} @ {}", serverName, 
serverAddress);
-
-               if (handler != null) {
-                       handler.shutdown();
-                       handler = null;
-               }
-
-               if (queryExecutor != null) {
-                       queryExecutor.shutdown();
-               }
-
-               if (bootstrap != null) {
-                       EventLoopGroup group = bootstrap.group();
-                       if (group != null) {
-                               group.shutdownGracefully(0L, 10L, 
TimeUnit.SECONDS);
-                       }
-               }
-               serverAddress = null;
-       }
-
-       /**
-        * Channel pipeline initializer.
-        *
-        * <p>The request handler is shared, whereas the other handlers are 
created
-        * per channel.
-        */
-       private static final class ServerChannelInitializer<REQ extends 
MessageBody, RESP extends MessageBody> extends 
ChannelInitializer<SocketChannel> {
-
-               /** The shared request handler. */
-               private final AbstractServerHandler<REQ, RESP> 
sharedRequestHandler;
-
-               /**
-                * Creates the channel pipeline initializer with the shared 
request handler.
-                *
-                * @param sharedRequestHandler Shared request handler.
-                */
-               ServerChannelInitializer(AbstractServerHandler<REQ, RESP> 
sharedRequestHandler) {
-                       this.sharedRequestHandler = 
Preconditions.checkNotNull(sharedRequestHandler, "MessageBody handler");
-               }
-
-               @Override
-               protected void initChannel(SocketChannel channel) throws 
Exception {
-                       channel.pipeline()
-                                       .addLast(new ChunkedWriteHandler())
-                                       .addLast(new 
LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))
-                                       .addLast(sharedRequestHandler);
-               }
-       }
-
-       @VisibleForTesting
-       public boolean isExecutorShutdown() {
-               return queryExecutor.isShutdown();
-       }
-}

Reply via email to