[FLINK-7788][QS] Allow specification of port range for queryable state proxy.

The queryable state client proxy can now take a port range as argument
so that if multiple proxies run on one machine, they can all start without
problems.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5338f856
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5338f856
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5338f856

Branch: refs/heads/master
Commit: 5338f856b42422189246130cc245754162fa9913
Parents: 9772e03
Author: kkloudas <kklou...@gmail.com>
Authored: Fri Oct 6 12:08:47 2017 +0200
Committer: kkloudas <kklou...@gmail.com>
Committed: Wed Oct 18 08:40:28 2017 +0200

----------------------------------------------------------------------
 .../configuration/QueryableStateOptions.java    |  25 ++-
 .../client/proxy/KvStateClientProxyImpl.java    |  13 +-
 .../network/AbstractServerBase.java             | 196 ++++++++++++-----
 .../server/KvStateServerImpl.java               |   4 +-
 .../itcases/AbstractQueryableStateITCase.java   |  18 +-
 .../itcases/HAAbstractQueryableStateITCase.java |   3 +-
 .../NonHAAbstractQueryableStateITCase.java      |   4 +-
 .../network/AbstractServerTest.java             | 219 +++++++++++++++++++
 .../queryablestate/network/ClientTest.java      |   2 +-
 .../network/KvStateServerHandlerTest.java       |   7 +-
 .../network/KvStateServerTest.java              |   2 +-
 .../runtime/io/network/NetworkEnvironment.java  |  12 +-
 .../flink/runtime/query/KvStateServer.java      |   2 +-
 .../runtime/query/QueryableStateUtils.java      |  11 +-
 .../QueryableStateConfiguration.java            |  22 +-
 .../taskexecutor/TaskManagerServices.java       |   4 +-
 .../TaskManagerServicesConfiguration.java       |  12 +-
 17 files changed, 450 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5338f856/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
index df850e9..2dd4cca 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
@@ -37,10 +37,27 @@ public class QueryableStateOptions {
                        key("query.server.enable")
                        .defaultValue(true);
 
-       /** Port to bind KvState server to (0 => pick random available port). */
-       public static final ConfigOption<Integer> SERVER_PORT =
+       /**
+        * The config parameter defining the server port range of the queryable 
state proxy.
+        *
+        * <p>A proxy runs on each Task Manager, so many proxies may run on the 
same
+        * machine.
+        *
+        * <p>Given this, and to avoid port clashes, the user can specify a 
port range and
+        * the proxy is going to bind to the first free port in that range.
+        *
+        * <p>The specified range can be:
+        * <ol>
+        *     <li>a port: "9123",
+        *     <li>a range of ports: "50100-50200", or
+        *     <li>a list of ranges and or points: 
"50100-50200,50300-50400,51234"
+        * </ol>
+        *
+        * <p><b>The default port is 9069.</b>
+        */
+       public static final ConfigOption<String> PROXY_PORT_RANGE =
                        key("query.server.port")
-                       .defaultValue(9069);
+                       .defaultValue("9069");
 
        /** Number of network (event loop) threads for the KvState server (0 => 
#slots). */
        public static final ConfigOption<Integer> SERVER_NETWORK_THREADS =
@@ -73,7 +90,7 @@ public class QueryableStateOptions {
 
        // 
------------------------------------------------------------------------
 
-       /** Not intended to be instantiated */
+       /** Not intended to be instantiated. */
        private QueryableStateOptions() {
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5338f856/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
index bca80de..2e0c287 100644
--- 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
@@ -33,6 +33,7 @@ import 
org.apache.flink.runtime.query.netty.KvStateRequestStats;
 import org.apache.flink.util.Preconditions;
 
 import java.net.InetAddress;
+import java.util.Iterator;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -58,26 +59,26 @@ public class KvStateClientProxyImpl extends 
AbstractServerBase<KvStateRequest, K
         * Creates the Queryable State Client Proxy.
         *
         * <p>The server is instantiated using reflection by the
-        * {@link 
org.apache.flink.runtime.query.QueryableStateUtils#createKvStateClientProxy(InetAddress,
 int, int, int, KvStateRequestStats)
-        * QueryableStateUtils.startKvStateClientProxy(InetAddress, int, int, 
int, KvStateRequestStats)}.
+        * {@link 
org.apache.flink.runtime.query.QueryableStateUtils#createKvStateClientProxy(InetAddress,
 Iterator, int, int, KvStateRequestStats)
+        * QueryableStateUtils.createKvStateClientProxy(InetAddress, Iterator, 
int, int, KvStateRequestStats)}.
         *
         * <p>The server needs to be started via {@link #start()} in order to 
bind
         * to the configured bind address.
         *
         * @param bindAddress the address to listen to.
-        * @param bindPort the port to listen to.
+        * @param bindPortIterator the port to listen to.
         * @param numEventLoopThreads number of event loop threads.
         * @param numQueryThreads number of query threads.
         * @param stats the statistics collector.
         */
        public KvStateClientProxyImpl(
                        final InetAddress bindAddress,
-                       final Integer bindPort,
+                       final Iterator<Integer> bindPortIterator,
                        final Integer numEventLoopThreads,
                        final Integer numQueryThreads,
                        final KvStateRequestStats stats) {
 
-               super("Queryable State Proxy Server", bindAddress, bindPort, 
numEventLoopThreads, numQueryThreads);
+               super("Queryable State Proxy Server", bindAddress, 
bindPortIterator, numEventLoopThreads, numQueryThreads);
                Preconditions.checkArgument(numQueryThreads >= 1, "Non-positive 
number of query threads.");
                this.queryExecutorThreads = numQueryThreads;
                this.stats = Preconditions.checkNotNull(stats);
@@ -89,7 +90,7 @@ public class KvStateClientProxyImpl extends 
AbstractServerBase<KvStateRequest, K
        }
 
        @Override
-       public void start() throws InterruptedException {
+       public void start() throws Throwable {
                super.start();
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5338f856/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
index 4bf8e98..8df42f7 100644
--- 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
@@ -23,11 +23,12 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.queryablestate.network.messages.MessageBody;
 import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
 import org.apache.flink.runtime.query.KvStateServerAddress;
+import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 
 import 
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
-import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption;
 import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
@@ -40,8 +41,12 @@ import 
org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandle
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.BindException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
@@ -65,13 +70,26 @@ public abstract class AbstractServerBase<REQ extends 
MessageBody, RESP extends M
        /** AbstractServerBase config: high water mark. */
        private static final int HIGH_WATER_MARK = 32 * 1024;
 
+       /** The name of the server, useful for debugging. */
        private final String serverName;
 
+       /** The {@link InetAddress address} to listen to. */
+       private final InetAddress bindAddress;
+
+       /** A port range on which to try to connect. */
+       private final Set<Integer> bindPortRange;
+
+       /** The number of threads to be allocated to the event loop. */
+       private final int numEventLoopThreads;
+
+       /** The number of threads to be used for query serving. */
+       private final int numQueryThreads;
+
        /** Netty's ServerBootstrap. */
-       private final ServerBootstrap bootstrap;
+       private ServerBootstrap bootstrap;
 
        /** Query executor thread pool. */
-       private final ExecutorService queryExecutor;
+       private ExecutorService queryExecutor;
 
        /** Address of this server. */
        private KvStateServerAddress serverAddress;
@@ -82,12 +100,11 @@ public abstract class AbstractServerBase<REQ extends 
MessageBody, RESP extends M
        /**
         * Creates the {@link AbstractServerBase}.
         *
-        * <p>The server needs to be started via {@link #start()} in order to 
bind
-        * to the configured bind address.
+        * <p>The server needs to be started via {@link #start()}.
         *
         * @param serverName the name of the server
         * @param bindAddress address to bind to
-        * @param bindPort port to bind to (random port if 0)
+        * @param bindPort port to bind to
         * @param numEventLoopThreads number of event loop threads
         */
        protected AbstractServerBase(
@@ -96,90 +113,166 @@ public abstract class AbstractServerBase<REQ extends 
MessageBody, RESP extends M
                        final Integer bindPort,
                        final Integer numEventLoopThreads,
                        final Integer numQueryThreads) {
+               this(
+                               serverName,
+                               bindAddress,
+                               Collections.singleton(bindPort).iterator(),
+                               numEventLoopThreads,
+                               numQueryThreads
+               );
+       }
+
+       /**
+        * Creates the {@link AbstractServerBase}.
+        *
+        * <p>The server needs to be started via {@link #start()}.
+        *
+        * @param serverName the name of the server
+        * @param bindAddress address to bind to
+        * @param bindPortIterator port to bind to
+        * @param numEventLoopThreads number of event loop threads
+        */
+       protected AbstractServerBase(
+                       final String serverName,
+                       final InetAddress bindAddress,
+                       final Iterator<Integer> bindPortIterator,
+                       final Integer numEventLoopThreads,
+                       final Integer numQueryThreads) {
 
-               Preconditions.checkNotNull(bindAddress);
-               Preconditions.checkArgument(bindPort >= 0 && bindPort <= 65536, 
"Port " + bindPort + " out of valid range (0-65536).");
+               Preconditions.checkNotNull(bindPortIterator);
                Preconditions.checkArgument(numEventLoopThreads >= 1, 
"Non-positive number of event loop threads.");
                Preconditions.checkArgument(numQueryThreads >= 1, "Non-positive 
number of query threads.");
 
                this.serverName = Preconditions.checkNotNull(serverName);
-               this.queryExecutor = createQueryExecutor(numQueryThreads);
-
-               final NettyBufferPool bufferPool = new 
NettyBufferPool(numEventLoopThreads);
-
-               final ThreadFactory threadFactory = new ThreadFactoryBuilder()
-                               .setDaemon(true)
-                               .setNameFormat("Flink " + serverName + " 
EventLoop Thread %d")
-                               .build();
-
-               final NioEventLoopGroup nioGroup = new 
NioEventLoopGroup(numEventLoopThreads, threadFactory);
-
-               bootstrap = new ServerBootstrap()
-                               // Bind address and port
-                               .localAddress(bindAddress, bindPort)
-                               // NIO server channels
-                               .group(nioGroup)
-                               .channel(NioServerSocketChannel.class)
-                               // AbstractServerBase channel Options
-                               .option(ChannelOption.ALLOCATOR, bufferPool)
-                               // Child channel options
-                               .childOption(ChannelOption.ALLOCATOR, 
bufferPool)
-                               
.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, HIGH_WATER_MARK)
-                               
.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, LOW_WATER_MARK);
+               this.bindAddress = Preconditions.checkNotNull(bindAddress);
+               this.numEventLoopThreads = numEventLoopThreads;
+               this.numQueryThreads = numQueryThreads;
+
+               this.bindPortRange = new HashSet<>();
+               while (bindPortIterator.hasNext()) {
+                       int port = bindPortIterator.next();
+                       Preconditions.checkArgument(port >= 0 && port <= 65535,
+                                       "Invalid port configuration. Port must 
be between 0 and 65535, but was " + port + ".");
+                       bindPortRange.add(port);
+               }
        }
 
        /**
         * Creates a thread pool for the query execution.
-        *
-        * @param numQueryThreads Number of query threads.
         * @return Thread pool for query execution
         */
-       private ExecutorService createQueryExecutor(int numQueryThreads) {
+       private ExecutorService createQueryExecutor() {
                ThreadFactory threadFactory = new ThreadFactoryBuilder()
                                .setDaemon(true)
                                .setNameFormat("Flink " + getServerName() + " 
Thread %d")
                                .build();
-
                return Executors.newFixedThreadPool(numQueryThreads, 
threadFactory);
        }
 
+       /**
+        * Returns the thread-pool responsible for processing incoming requests.
+        */
        protected ExecutorService getQueryExecutor() {
                return queryExecutor;
        }
 
+       /**
+        * Gets the name of the server. This is useful for debugging.
+        * @return The name of the server.
+        */
        public String getServerName() {
                return serverName;
        }
 
+       /**
+        * Returns the {@link AbstractServerHandler handler} to be used for
+        * serving the incoming requests.
+        */
        public abstract AbstractServerHandler<REQ, RESP> initializeHandler();
 
        /**
+        * Returns the address of this server.
+        *
+        * @return AbstractServerBase address
+        * @throws IllegalStateException If server has not been started yet
+        */
+       public KvStateServerAddress getServerAddress() {
+               Preconditions.checkState(serverAddress != null, "Server " + 
serverName + " has not been started.");
+               return serverAddress;
+       }
+
+       /**
         * Starts the server by binding to the configured bind address 
(blocking).
-        * @throws InterruptedException If interrupted during the bind operation
+        * @throws Exception If something goes wrong during the bind operation.
         */
-       public void start() throws InterruptedException {
+       public void start() throws Throwable {
                Preconditions.checkState(serverAddress == null,
-                               "Server " + serverName + " has already been 
started @ " + serverAddress + '.');
+                               "Server " + serverName + " already running @ " 
+ serverAddress + '.');
 
-               this.handler = initializeHandler();
-               bootstrap.childHandler(new ServerChannelInitializer<>(handler));
-
-               Channel channel = bootstrap.bind().sync().channel();
-               InetSocketAddress localAddress = (InetSocketAddress) 
channel.localAddress();
-               serverAddress = new 
KvStateServerAddress(localAddress.getAddress(), localAddress.getPort());
+               Iterator<Integer> portIterator = bindPortRange.iterator();
+               while (portIterator.hasNext() && 
!attemptToBind(portIterator.next())) {}
 
-               LOG.info("Started server {} @ {}", serverName, serverAddress);
+               if (serverAddress != null) {
+                       LOG.info("Started server {} @ {}.", serverName, 
serverAddress);
+               } else {
+                       LOG.info("Unable to start server {}. All ports in 
provided range are occupied.", serverName);
+                       throw new FlinkRuntimeException("Unable to start server 
" + serverName + ". All ports in provided range are occupied.");
+               }
        }
 
        /**
-        * Returns the address of this server.
+        * Tries to start the server at the provided port.
         *
-        * @return AbstractServerBase address
-        * @throws IllegalStateException If server has not been started yet
+        * <p>This, in conjunction with {@link #start()}, try to start the
+        * server on a free port among the port range provided at the 
constructor.
+        *
+        * @param port the port to try to bind the server to.
+        * @throws Exception If something goes wrong during the bind operation.
         */
-       public KvStateServerAddress getServerAddress() {
-               Preconditions.checkState(serverAddress != null, "Server " + 
serverName + " has not been started.");
-               return serverAddress;
+       private boolean attemptToBind(final int port) throws Throwable {
+               LOG.debug("Attempting to start server {} on port {}.", 
serverName, port);
+
+               this.queryExecutor = createQueryExecutor();
+               this.handler = initializeHandler();
+
+               final NettyBufferPool bufferPool = new 
NettyBufferPool(numEventLoopThreads);
+
+               final ThreadFactory threadFactory = new ThreadFactoryBuilder()
+                               .setDaemon(true)
+                               .setNameFormat("Flink " + serverName + " 
EventLoop Thread %d")
+                               .build();
+
+               final NioEventLoopGroup nioGroup = new 
NioEventLoopGroup(numEventLoopThreads, threadFactory);
+
+               this.bootstrap = new ServerBootstrap()
+                               .localAddress(bindAddress, port)
+                               .group(nioGroup)
+                               .channel(NioServerSocketChannel.class)
+                               .option(ChannelOption.ALLOCATOR, bufferPool)
+                               .childOption(ChannelOption.ALLOCATOR, 
bufferPool)
+                               
.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, HIGH_WATER_MARK)
+                               
.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, LOW_WATER_MARK)
+                               .childHandler(new 
ServerChannelInitializer<>(handler));
+
+               try {
+                       final ChannelFuture future = bootstrap.bind().sync();
+                       if (future.isSuccess()) {
+                               final InetSocketAddress localAddress = 
(InetSocketAddress) future.channel().localAddress();
+                               serverAddress = new 
KvStateServerAddress(localAddress.getAddress(), localAddress.getPort());
+                               return true;
+                       }
+
+                       // the following throw is to bypass Netty's 
"optimization magic"
+                       // and catch the bind exception.
+                       // the exception is thrown by the sync() call above.
+
+                       throw future.cause();
+               } catch (BindException e) {
+                       LOG.debug("Failed to start server {} on port {}: {}.", 
serverName, port, e.getMessage());
+                       shutdown();
+               }
+               // any other type of exception we let it bubble up.
+               return false;
        }
 
        /**
@@ -190,6 +283,7 @@ public abstract class AbstractServerBase<REQ extends 
MessageBody, RESP extends M
 
                if (handler != null) {
                        handler.shutdown();
+                       handler = null;
                }
 
                if (queryExecutor != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/5338f856/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
index b4c548a..1673015 100644
--- 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
@@ -55,7 +55,7 @@ public class KvStateServerImpl extends 
AbstractServerBase<KvStateInternalRequest
         *
         * <p>The server is instantiated using reflection by the
         * {@link 
org.apache.flink.runtime.query.QueryableStateUtils#createKvStateServer(InetAddress,
 int, int, int, KvStateRegistry, KvStateRequestStats)
-        * QueryableStateUtils.startKvStateServer(InetAddress, int, int, int, 
KvStateRegistry, KvStateRequestStats)}.
+        * QueryableStateUtils.createKvStateServer(InetAddress, int, int, int, 
KvStateRegistry, KvStateRequestStats)}.
         *
         * <p>The server needs to be started via {@link #start()} in order to 
bind
         * to the configured bind address.
@@ -94,7 +94,7 @@ public class KvStateServerImpl extends 
AbstractServerBase<KvStateInternalRequest
        }
 
        @Override
-       public void start() throws InterruptedException {
+       public void start() throws Throwable {
                super.start();
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5338f856/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateITCase.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateITCase.java
index 7ff4ec6..a096f55 100644
--- 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateITCase.java
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateITCase.java
@@ -147,7 +147,7 @@ public abstract class AbstractQueryableStateITCase extends 
TestLogger {
 
                final QueryableStateClient client = new QueryableStateClient(
                                "localhost",
-                               
cluster.configuration().getInteger(QueryableStateOptions.SERVER_PORT));
+                               
Integer.parseInt(QueryableStateOptions.PROXY_PORT_RANGE.defaultValue()));
 
                JobID jobId = null;
 
@@ -371,7 +371,7 @@ public abstract class AbstractQueryableStateITCase extends 
TestLogger {
 
                final QueryableStateClient client = new QueryableStateClient(
                                "localhost",
-                               
cluster.configuration().getInteger(QueryableStateOptions.SERVER_PORT));
+                               
Integer.parseInt(QueryableStateOptions.PROXY_PORT_RANGE.defaultValue()));
 
                JobID jobId = null;
                try {
@@ -435,7 +435,7 @@ public abstract class AbstractQueryableStateITCase extends 
TestLogger {
 
                final QueryableStateClient client = new QueryableStateClient(
                                "localhost",
-                               
cluster.configuration().getInteger(QueryableStateOptions.SERVER_PORT));
+                               
Integer.parseInt(QueryableStateOptions.PROXY_PORT_RANGE.defaultValue()));
 
                JobID jobId = null;
                try {
@@ -600,7 +600,7 @@ public abstract class AbstractQueryableStateITCase extends 
TestLogger {
 
                final QueryableStateClient client = new QueryableStateClient(
                                "localhost",
-                               
cluster.configuration().getInteger(QueryableStateOptions.SERVER_PORT));
+                               
Integer.parseInt(QueryableStateOptions.PROXY_PORT_RANGE.defaultValue()));
 
                JobID jobId = null;
                try {
@@ -698,7 +698,7 @@ public abstract class AbstractQueryableStateITCase extends 
TestLogger {
 
                final QueryableStateClient client = new QueryableStateClient(
                                "localhost",
-                               
cluster.configuration().getInteger(QueryableStateOptions.SERVER_PORT));
+                               
Integer.parseInt(QueryableStateOptions.PROXY_PORT_RANGE.defaultValue()));
 
                JobID jobId = null;
                try {
@@ -764,7 +764,7 @@ public abstract class AbstractQueryableStateITCase extends 
TestLogger {
 
                final QueryableStateClient client = new QueryableStateClient(
                                "localhost",
-                               
cluster.configuration().getInteger(QueryableStateOptions.SERVER_PORT));
+                               
Integer.parseInt(QueryableStateOptions.PROXY_PORT_RANGE.defaultValue()));
 
                JobID jobId = null;
                try {
@@ -774,7 +774,7 @@ public abstract class AbstractQueryableStateITCase extends 
TestLogger {
                        // Very important, because cluster is shared between 
tests and we
                        // don't explicitly check that all slots are available 
before
                        // submitting.
-                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
1000));
+                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
1000L));
 
                        DataStream<Tuple2<Integer, Long>> source = env
                                        .addSource(new 
TestAscendingValueSource(numElements));
@@ -861,7 +861,7 @@ public abstract class AbstractQueryableStateITCase extends 
TestLogger {
 
                final QueryableStateClient client = new QueryableStateClient(
                                "localhost",
-                               
cluster.configuration().getInteger(QueryableStateOptions.SERVER_PORT));
+                               
Integer.parseInt(QueryableStateOptions.PROXY_PORT_RANGE.defaultValue()));
 
                JobID jobId = null;
                try {
@@ -871,7 +871,7 @@ public abstract class AbstractQueryableStateITCase extends 
TestLogger {
                        // Very important, because cluster is shared between 
tests and we
                        // don't explicitly check that all slots are available 
before
                        // submitting.
-                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
1000));
+                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
1000L));
 
                        DataStream<Tuple2<Integer, Long>> source = env
                                        .addSource(new 
TestAscendingValueSource(numElements));

http://git-wip-us.apache.org/repos/asf/flink/blob/5338f856/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java
index a2a9678..bcb6be4 100644
--- 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java
@@ -39,7 +39,7 @@ import static org.junit.Assert.fail;
 public abstract class HAAbstractQueryableStateITCase extends 
AbstractQueryableStateITCase {
 
        private static final int NUM_JMS = 2;
-       private static final int NUM_TMS = 1;
+       private static final int NUM_TMS = 2;
        private static final int NUM_SLOTS_PER_TM = 4;
 
        private static TestingServer zkServer;
@@ -59,6 +59,7 @@ public abstract class HAAbstractQueryableStateITCase extends 
AbstractQueryableSt
                        config.setBoolean(QueryableStateOptions.SERVER_ENABLE, 
true);
                        
config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 2);
                        
config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 2);
+                       
config.setString(QueryableStateOptions.PROXY_PORT_RANGE, "9069-" + (9069 + 
NUM_TMS));
                        
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
temporaryFolder.newFolder().toString());
                        
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
zkServer.getConnectString());
                        config.setString(HighAvailabilityOptions.HA_MODE, 
"zookeeper");

http://git-wip-us.apache.org/repos/asf/flink/blob/5338f856/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java
index 1173d0d..55f1841 100644
--- 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java
@@ -36,7 +36,7 @@ import static org.junit.Assert.fail;
  */
 public abstract class NonHAAbstractQueryableStateITCase extends 
AbstractQueryableStateITCase {
 
-       private static final int NUM_TMS = 1;
+       private static final int NUM_TMS = 2;
        private static final int NUM_SLOTS_PER_TM = 4;
 
        @BeforeClass
@@ -47,9 +47,9 @@ public abstract class NonHAAbstractQueryableStateITCase 
extends AbstractQueryabl
                        
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
                        
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
NUM_SLOTS_PER_TM);
                        
config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1);
-                       config.setInteger(QueryableStateOptions.SERVER_PORT, 
9069);
                        config.setBoolean(QueryableStateOptions.SERVER_ENABLE, 
true);
                        
config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 1);
+                       
config.setString(QueryableStateOptions.PROXY_PORT_RANGE, "9069-" + (9069 + 
NUM_TMS));
 
                        cluster = new TestingCluster(config, false);
                        cluster.start(true);

http://git-wip-us.apache.org/repos/asf/flink/blob/5338f856/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
new file mode 100644
index 0000000..1fd7012
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.queryablestate.network.messages.MessageBody;
+import org.apache.flink.queryablestate.network.messages.MessageDeserializer;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Tests general behavior of the {@link AbstractServerBase}.
+ */
+public class AbstractServerTest {
+
+       @Rule
+       public ExpectedException expectedEx = ExpectedException.none();
+
+       /**
+        * Tests that in case of port collision, a FlinkRuntimeException is 
thrown
+        * with a specific message.
+        */
+       @Test
+       public void testServerInitializationFailure() throws Throwable {
+
+               // the expected exception along with the adequate message
+               expectedEx.expect(FlinkRuntimeException.class);
+               expectedEx.expectMessage("Unable to start server Test Server 2. 
All ports in provided range are occupied.");
+
+               TestServer server1 = null;
+               TestServer server2 = null;
+               try {
+
+                       server1 = startServer("Test Server 1", 7777);
+                       Assert.assertEquals(7777L, 
server1.getServerAddress().getPort());
+
+                       server2 = startServer("Test Server 2", 7777);
+               } finally {
+
+                       if (server1 != null) {
+                               server1.shutdown();
+                       }
+
+                       if (server2 != null) {
+                               server2.shutdown();
+                       }
+               }
+       }
+
+       /**
+        * Tests that in case of port collision and big enough port range,
+        * the server will try to bind to the next port in the range.
+        */
+       @Test
+       public void testPortRangeSuccess() throws Throwable {
+               TestServer server1 = null;
+               TestServer server2 = null;
+               Client<TestMessage, TestMessage> client = null;
+
+               try {
+                       server1 = startServer("Test Server 1", 7777, 7778, 
7779);
+                       Assert.assertEquals(7777L, 
server1.getServerAddress().getPort());
+
+                       server2 = startServer("Test Server 2", 7777, 7778, 
7779);
+                       Assert.assertEquals(7778L, 
server2.getServerAddress().getPort());
+
+                       client = new Client<>(
+                                       "Test Client",
+                                       1,
+                                       new MessageSerializer<>(new 
TestMessage.TestMessageDeserializer(), new 
TestMessage.TestMessageDeserializer()),
+                                       new DisabledKvStateRequestStats());
+
+                       TestMessage response1 = 
client.sendRequest(server1.getServerAddress(), new TestMessage("ping")).join();
+                       Assert.assertEquals(server1.getServerName() + "-ping", 
response1.getMessage());
+
+                       TestMessage response2 = 
client.sendRequest(server2.getServerAddress(), new TestMessage("pong")).join();
+                       Assert.assertEquals(server2.getServerName() + "-pong", 
response2.getMessage());
+               } finally {
+
+                       if (server1 != null) {
+                               server1.shutdown();
+                       }
+
+                       if (server2 != null) {
+                               server2.shutdown();
+                       }
+
+                       if (client != null) {
+                               client.shutdown();
+                       }
+               }
+       }
+
+       /**
+        * Initializes a {@link TestServer} with the given port range.
+        * @param serverName the name of the server.
+        * @param ports a range of ports.
+        * @return A test server with the given name.
+        */
+       private TestServer startServer(String serverName, int... ports) throws 
Throwable {
+               List<Integer> portList = new ArrayList<>(ports.length);
+               for (int p : ports) {
+                       portList.add(p);
+               }
+
+               final TestServer server = new TestServer(serverName, 
portList.iterator());
+               server.start();
+               return server;
+       }
+
+       /**
+        * A server that receives a {@link TestMessage test message} and 
returns another test
+        * message containing the same string as the request with the name of 
the server prepended.
+        */
+       private class TestServer extends AbstractServerBase<TestMessage, 
TestMessage> {
+
+               protected TestServer(String name, Iterator<Integer> bindPort) 
throws UnknownHostException {
+                       super(name, InetAddress.getLocalHost(), bindPort, 1, 1);
+               }
+
+               @Override
+               public AbstractServerHandler<TestMessage, TestMessage> 
initializeHandler() {
+                       return new AbstractServerHandler<TestMessage, 
TestMessage>(
+                                       this,
+                                       new MessageSerializer<>(new 
TestMessage.TestMessageDeserializer(), new 
TestMessage.TestMessageDeserializer()),
+                                       new DisabledKvStateRequestStats()) {
+
+                               @Override
+                               public CompletableFuture<TestMessage> 
handleRequest(long requestId, TestMessage request) {
+                                       TestMessage response = new 
TestMessage(getServerName() + '-' + request.getMessage());
+                                       return 
CompletableFuture.completedFuture(response);
+                               }
+
+                               @Override
+                               public void shutdown() {
+                                       // do nothing
+                               }
+                       };
+               }
+       }
+
+       /**
+        * Message with a string as payload.
+        */
+       private static class TestMessage extends MessageBody {
+
+               private final String message;
+
+               TestMessage(String message) {
+                       this.message = Preconditions.checkNotNull(message);
+               }
+
+               public String getMessage() {
+                       return message;
+               }
+
+               @Override
+               public byte[] serialize() {
+                       byte[] content = 
message.getBytes(ConfigConstants.DEFAULT_CHARSET);
+
+                       // message size + 4 for the length itself
+                       return ByteBuffer.allocate(content.length + 
Integer.BYTES)
+                                       .putInt(content.length)
+                                       .put(content)
+                                       .array();
+               }
+
+               /**
+                * The deserializer for our {@link TestMessage test messages}.
+                */
+               public static class TestMessageDeserializer implements 
MessageDeserializer<TestMessage> {
+
+                       @Override
+                       public TestMessage deserializeMessage(ByteBuf buf) {
+                               int length = buf.readInt();
+                               String message = "";
+                               if (length > 0) {
+                                       byte[] name = new byte[length];
+                                       buf.readBytes(name);
+                                       message = new String(name, 
ConfigConstants.DEFAULT_CHARSET);
+                               }
+                               return new TestMessage(message);
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5338f856/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
index b6f855e..53b1592 100644
--- 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
@@ -566,7 +566,7 @@ public class ClientTest {
         * that all ongoing requests are failed.
         */
        @Test
-       public void testClientServerIntegration() throws Exception {
+       public void testClientServerIntegration() throws Throwable {
                // Config
                final int numServers = 2;
                final int numServerEventLoopThreads = 2;

http://git-wip-us.apache.org/repos/asf/flink/blob/5338f856/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
index 944349ee..97e999d 100644
--- 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
@@ -63,7 +63,6 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.net.InetAddress;
-import java.net.UnknownHostException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -93,7 +92,8 @@ public class KvStateServerHandlerTest extends TestLogger {
                                        1,
                                        new KvStateRegistry(),
                                        new DisabledKvStateRequestStats());
-               } catch (UnknownHostException e) {
+                       testServer.start();
+               } catch (Throwable e) {
                        e.printStackTrace();
                }
        }
@@ -376,7 +376,7 @@ public class KvStateServerHandlerTest extends TestLogger {
         * Tests the failure response on a rejected execution, because the 
query executor has been closed.
         */
        @Test
-       public void testQueryExecutorShutDown() throws Exception {
+       public void testQueryExecutorShutDown() throws Throwable {
                KvStateRegistry registry = new KvStateRegistry();
                AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
 
@@ -388,6 +388,7 @@ public class KvStateServerHandlerTest extends TestLogger {
                                new KvStateRegistry(),
                                new DisabledKvStateRequestStats());
 
+               localTestServer.start();
                localTestServer.shutdown();
                assertTrue(localTestServer.isExecutorShutdown());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5338f856/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
index b7f489a..16f80c6 100644
--- 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
@@ -86,7 +86,7 @@ public class KvStateServerTest {
         * Tests a simple successful query via a SocketChannel.
         */
        @Test
-       public void testSimpleRequest() throws Exception {
+       public void testSimpleRequest() throws Throwable {
                KvStateServerImpl server = null;
                Bootstrap bootstrap = null;
                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/5338f856/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index d6c5d75..cb43fbf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -65,10 +65,10 @@ public class NetworkEnvironment {
        private final TaskEventDispatcher taskEventDispatcher;
 
        /** Server for {@link InternalKvState} requests. */
-       private final KvStateServer kvStateServer;
+       private KvStateServer kvStateServer;
 
        /** Proxy for the queryable state client. */
-       private final KvStateClientProxy kvStateProxy;
+       private KvStateClientProxy kvStateProxy;
 
        /** Registry for {@link InternalKvState} instances. */
        private final KvStateRegistry kvStateRegistry;
@@ -311,7 +311,9 @@ public class NetworkEnvironment {
                                try {
                                        kvStateServer.start();
                                        LOG.info("Started Queryable State Data 
Server @ {}", kvStateServer.getServerAddress());
-                               } catch (InterruptedException ie) {
+                               } catch (Throwable ie) {
+                                       kvStateServer.shutdown();
+                                       kvStateServer = null;
                                        throw new IOException("Failed to start 
the Queryable State Data Server.", ie);
                                }
                        }
@@ -320,7 +322,9 @@ public class NetworkEnvironment {
                                try {
                                        kvStateProxy.start();
                                        LOG.info("Started the Queryable State 
Client Proxy @ {}", kvStateProxy.getServerAddress());
-                               } catch (InterruptedException ie) {
+                               } catch (Throwable ie) {
+                                       kvStateProxy.shutdown();
+                                       kvStateProxy = null;
                                        throw new IOException("Failed to start 
the Queryable State Client Proxy.", ie);
                                }
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/5338f856/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java
index 81727fc..17ffe0d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java
@@ -33,7 +33,7 @@ public interface KvStateServer {
 
 
        /** Starts the server. */
-       void start() throws InterruptedException;
+       void start() throws Throwable;
 
        /** Shuts down the server and all related thread pools. */
        void shutdown();

http://git-wip-us.apache.org/repos/asf/flink/blob/5338f856/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java
index 8f66734..f6a8627 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java
@@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.net.InetAddress;
+import java.util.Iterator;
 
 /**
  * Utility class to initialize entities used in queryable state.
@@ -40,7 +41,9 @@ public final class QueryableStateUtils {
         * receiving requests from the external (to the cluster) client and 
forwarding them internally.
         *
         * @param address the address to bind to.
-        * @param port the port to listen to.
+        * @param ports the range of ports the proxy will attempt to listen to
+        *                 (see {@link 
org.apache.flink.configuration.QueryableStateOptions#PROXY_PORT_RANGE
+        *                 QueryableStateOptions.PROXY_PORT_RANGE}).
         * @param eventLoopThreads the number of threads to be used to process 
incoming requests.
         * @param queryThreads the number of threads to be used to send the 
actual state.
         * @param stats statistics to be gathered about the incoming requests.
@@ -48,7 +51,7 @@ public final class QueryableStateUtils {
         */
        public static KvStateClientProxy createKvStateClientProxy(
                        final InetAddress address,
-                       final int port,
+                       final Iterator<Integer> ports,
                        final int eventLoopThreads,
                        final int queryThreads,
                        final KvStateRequestStats stats) {
@@ -64,11 +67,11 @@ public final class QueryableStateUtils {
                        Class<? extends KvStateClientProxy> clazz = 
Class.forName(classname).asSubclass(KvStateClientProxy.class);
                        Constructor<? extends KvStateClientProxy> constructor = 
clazz.getConstructor(
                                        InetAddress.class,
-                                       Integer.class,
+                                       Iterator.class,
                                        Integer.class,
                                        Integer.class,
                                        KvStateRequestStats.class);
-                       return constructor.newInstance(address, port, 
eventLoopThreads, queryThreads, stats);
+                       return constructor.newInstance(address, ports, 
eventLoopThreads, queryThreads, stats);
                } catch (ClassNotFoundException e) {
                        LOG.warn("Could not load Queryable State Client Proxy. 
" +
                                        "Probable reason: flink-queryable-state 
is not in the classpath");

http://git-wip-us.apache.org/repos/asf/flink/blob/5338f856/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java
index fed5fc0..0c3ef0e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
+import java.util.Iterator;
+
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
@@ -27,19 +29,19 @@ public class QueryableStateConfiguration {
 
        private final boolean enabled;
 
-       private final int port;
+       private final Iterator<Integer> proxyPortRange;
 
        private final int numServerThreads;
 
        private final int numQueryThreads;
 
-       public QueryableStateConfiguration(boolean enabled, int port, int 
numServerThreads, int numQueryThreads) {
-               checkArgument(port >= 0 && port < 65536, "queryable state 
server port out of range");
+       public QueryableStateConfiguration(boolean enabled, Iterator<Integer> 
proxyPortRange, int numServerThreads, int numQueryThreads) {
+               checkArgument(!enabled || (proxyPortRange != null && 
proxyPortRange.hasNext()));
                checkArgument(numServerThreads >= 0, "queryable state number of 
server threads must be zero or larger");
                checkArgument(numQueryThreads >= 0, "queryable state number of 
query threads must be zero or larger");
 
                this.enabled = enabled;
-               this.port = port;
+               this.proxyPortRange = proxyPortRange;
                this.numServerThreads = numServerThreads;
                this.numQueryThreads = numQueryThreads;
        }
@@ -49,15 +51,16 @@ public class QueryableStateConfiguration {
        /**
         * Returns whether queryable state is enabled.
         */
-       public boolean enabled() {
+       public boolean isEnabled() {
                return enabled;
        }
 
        /**
-        * Returns the port where the server should listen.
+        * Returns the port range where the queryable state client proxy can 
listen.
+        * See {@link 
org.apache.flink.configuration.QueryableStateOptions#PROXY_PORT_RANGE 
QueryableStateOptions.PROXY_PORT_RANGE}.
         */
-       public int port() {
-               return port;
+       public Iterator<Integer> ports() {
+               return proxyPortRange;
        }
 
        /**
@@ -82,7 +85,6 @@ public class QueryableStateConfiguration {
        public String toString() {
                return "QueryableStateConfiguration {" +
                                "enabled=" + enabled +
-                               ", port=" + port +
                                ", numServerThreads=" + numServerThreads +
                                ", numQueryThreads=" + numQueryThreads +
                                '}';
@@ -94,6 +96,6 @@ public class QueryableStateConfiguration {
         * Gets the configuration describing the queryable state as deactivated.
         */
        public static QueryableStateConfiguration disabled() {
-               return new QueryableStateConfiguration(false, 0, 0, 0);
+               return new QueryableStateConfiguration(false, null, 0, 0);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5338f856/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index cbf0d95..312622b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -360,7 +360,7 @@ public class TaskManagerServices {
                KvStateClientProxy kvClientProxy = null;
                KvStateServer kvStateServer = null;
 
-               if 
(taskManagerServicesConfiguration.getQueryableStateConfig().enabled()) {
+               if 
(taskManagerServicesConfiguration.getQueryableStateConfig().isEnabled()) {
                        QueryableStateConfiguration qsConfig = 
taskManagerServicesConfiguration.getQueryableStateConfig();
 
                        int numNetworkThreads = qsConfig.numServerThreads() == 
0 ?
@@ -371,7 +371,7 @@ public class TaskManagerServices {
 
                        kvClientProxy = 
QueryableStateUtils.createKvStateClientProxy(
                                        
taskManagerServicesConfiguration.getTaskManagerAddress(),
-                                       qsConfig.port(),
+                                       qsConfig.ports(),
                                        numNetworkThreads,
                                        numQueryThreads,
                                        new DisabledKvStateRequestStats());

http://git-wip-us.apache.org/repos/asf/flink/blob/5338f856/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index e6643b7..31bfeff 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.NetUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,6 +39,7 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.util.Iterator;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -189,7 +191,6 @@ public class TaskManagerServicesConfiguration {
                        remoteAddress,
                        slots);
 
-               // @Ufuk todo why was it like this before ???
                final QueryableStateConfiguration queryableStateConfig =
                                parseQueryableStateConfiguration(configuration);
 
@@ -415,10 +416,11 @@ public class TaskManagerServicesConfiguration {
                final boolean enabled = 
config.getBoolean(QueryableStateOptions.SERVER_ENABLE);
 
                if (enabled) {
-                       int port = 
config.getInteger(QueryableStateOptions.SERVER_PORT);
-                       int numNetworkThreads = 
config.getInteger(QueryableStateOptions.SERVER_NETWORK_THREADS);
-                       int numQueryThreads = 
config.getInteger(QueryableStateOptions.SERVER_ASYNC_QUERY_THREADS);
-                       return new QueryableStateConfiguration(true, port, 
numNetworkThreads, numQueryThreads);
+                       final Iterator<Integer> ports = 
NetUtils.getPortRangeFromString(
+                                       
config.getString(QueryableStateOptions.PROXY_PORT_RANGE, "9069"));
+                       final int numNetworkThreads = 
config.getInteger(QueryableStateOptions.SERVER_NETWORK_THREADS);
+                       final int numQueryThreads = 
config.getInteger(QueryableStateOptions.SERVER_ASYNC_QUERY_THREADS);
+                       return new QueryableStateConfiguration(true, ports, 
numNetworkThreads, numQueryThreads);
                }
                else {
                        return QueryableStateConfiguration.disabled();

Reply via email to