Repository: flink
Updated Branches:
  refs/heads/master 9772e03d7 -> 717a7dc81


[FLINK-5920][QS] Allow specification of port range for queryable state server.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/717a7dc8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/717a7dc8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/717a7dc8

Branch: refs/heads/master
Commit: 717a7dc81d066dc7d6e1a17099c0f5e1bc96b5d1
Parents: 5338f85
Author: kkloudas <kklou...@gmail.com>
Authored: Fri Oct 13 19:15:11 2017 +0200
Committer: kkloudas <kklou...@gmail.com>
Committed: Wed Oct 18 08:40:28 2017 +0200

----------------------------------------------------------------------
 .../configuration/QueryableStateOptions.java    | 26 ++++++++++++++++++--
 .../client/proxy/KvStateClientProxyImpl.java    |  2 +-
 .../network/AbstractServerBase.java             | 25 -------------------
 .../server/KvStateServerImpl.java               | 11 +++++----
 .../itcases/HAAbstractQueryableStateITCase.java |  1 +
 .../NonHAAbstractQueryableStateITCase.java      |  1 +
 .../queryablestate/network/ClientTest.java      |  2 +-
 .../network/KvStateServerHandlerTest.java       |  5 ++--
 .../network/KvStateServerTest.java              |  4 ++-
 .../runtime/query/QueryableStateUtils.java      | 10 +++++---
 .../QueryableStateConfiguration.java            | 24 +++++++++++++++---
 .../taskexecutor/TaskManagerServices.java       |  4 +--
 .../TaskManagerServicesConfiguration.java       | 11 ++++++---
 13 files changed, 77 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/717a7dc8/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
index 2dd4cca..adba938 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
@@ -44,7 +44,7 @@ public class QueryableStateOptions {
         * machine.
         *
         * <p>Given this, and to avoid port clashes, the user can specify a 
port range and
-        * the proxy is going to bind to the first free port in that range.
+        * the proxy will bind to the first free port in that range.
         *
         * <p>The specified range can be:
         * <ol>
@@ -56,9 +56,31 @@ public class QueryableStateOptions {
         * <p><b>The default port is 9069.</b>
         */
        public static final ConfigOption<String> PROXY_PORT_RANGE =
-                       key("query.server.port")
+                       key("query.proxy.ports")
                        .defaultValue("9069");
 
+       /**
+        * The config parameter defining the server port range of the queryable 
state server.
+        *
+        * <p>A state server runs on each Task Manager, so many server may run 
on the same
+        * machine.
+        *
+        * <p>Given this, and to avoid port clashes, the user can specify a 
port range and
+        * the server will bind to the first free port in that range.
+        *
+        * <p>The specified range can be:
+        * <ol>
+        *     <li>a port: "9123",
+        *     <li>a range of ports: "50100-50200", or
+        *     <li>a list of ranges and or points: 
"50100-50200,50300-50400,51234"
+        * </ol>
+        *
+        * <p><b>The default port is 9067.</b>
+        */
+       public static final ConfigOption<String> SERVER_PORT_RANGE =
+                       key("query.server.ports")
+                       .defaultValue("9067");
+
        /** Number of network (event loop) threads for the KvState server (0 => 
#slots). */
        public static final ConfigOption<Integer> SERVER_NETWORK_THREADS =
                        key("query.server.network-threads")

http://git-wip-us.apache.org/repos/asf/flink/blob/717a7dc8/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
index 2e0c287..196641d 100644
--- 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
@@ -66,7 +66,7 @@ public class KvStateClientProxyImpl extends 
AbstractServerBase<KvStateRequest, K
         * to the configured bind address.
         *
         * @param bindAddress the address to listen to.
-        * @param bindPortIterator the port to listen to.
+        * @param bindPortIterator the port range to try to bind to.
         * @param numEventLoopThreads number of event loop threads.
         * @param numQueryThreads number of query threads.
         * @param stats the statistics collector.

http://git-wip-us.apache.org/repos/asf/flink/blob/717a7dc8/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
index 8df42f7..be852fb 100644
--- 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
@@ -104,31 +104,6 @@ public abstract class AbstractServerBase<REQ extends 
MessageBody, RESP extends M
         *
         * @param serverName the name of the server
         * @param bindAddress address to bind to
-        * @param bindPort port to bind to
-        * @param numEventLoopThreads number of event loop threads
-        */
-       protected AbstractServerBase(
-                       final String serverName,
-                       final InetAddress bindAddress,
-                       final Integer bindPort,
-                       final Integer numEventLoopThreads,
-                       final Integer numQueryThreads) {
-               this(
-                               serverName,
-                               bindAddress,
-                               Collections.singleton(bindPort).iterator(),
-                               numEventLoopThreads,
-                               numQueryThreads
-               );
-       }
-
-       /**
-        * Creates the {@link AbstractServerBase}.
-        *
-        * <p>The server needs to be started via {@link #start()}.
-        *
-        * @param serverName the name of the server
-        * @param bindAddress address to bind to
         * @param bindPortIterator port to bind to
         * @param numEventLoopThreads number of event loop threads
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/717a7dc8/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
index 1673015..dfca915 100644
--- 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
@@ -34,6 +34,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.InetAddress;
+import java.util.Iterator;
 
 /**
  * The default implementation of the {@link KvStateServer}.
@@ -54,14 +55,14 @@ public class KvStateServerImpl extends 
AbstractServerBase<KvStateInternalRequest
         * Creates the state server.
         *
         * <p>The server is instantiated using reflection by the
-        * {@link 
org.apache.flink.runtime.query.QueryableStateUtils#createKvStateServer(InetAddress,
 int, int, int, KvStateRegistry, KvStateRequestStats)
-        * QueryableStateUtils.createKvStateServer(InetAddress, int, int, int, 
KvStateRegistry, KvStateRequestStats)}.
+        * {@link 
org.apache.flink.runtime.query.QueryableStateUtils#createKvStateServer(InetAddress,
 Iterator, int, int, KvStateRegistry, KvStateRequestStats)
+        * QueryableStateUtils.createKvStateServer(InetAddress, Iterator, int, 
int, KvStateRegistry, KvStateRequestStats)}.
         *
         * <p>The server needs to be started via {@link #start()} in order to 
bind
         * to the configured bind address.
         *
         * @param bindAddress the address to listen to.
-        * @param bindPort the port to listen to.
+        * @param bindPortIterator the port range to try to bind to.
         * @param numEventLoopThreads number of event loop threads.
         * @param numQueryThreads number of query threads.
         * @param kvStateRegistry {@link KvStateRegistry} to query for state 
instances.
@@ -69,13 +70,13 @@ public class KvStateServerImpl extends 
AbstractServerBase<KvStateInternalRequest
         */
        public KvStateServerImpl(
                        final InetAddress bindAddress,
-                       final Integer bindPort,
+                       final Iterator<Integer> bindPortIterator,
                        final Integer numEventLoopThreads,
                        final Integer numQueryThreads,
                        final KvStateRegistry kvStateRegistry,
                        final KvStateRequestStats stats) {
 
-               super("Queryable State Server", bindAddress, bindPort, 
numEventLoopThreads, numQueryThreads);
+               super("Queryable State Server", bindAddress, bindPortIterator, 
numEventLoopThreads, numQueryThreads);
                this.stats = Preconditions.checkNotNull(stats);
                this.kvStateRegistry = 
Preconditions.checkNotNull(kvStateRegistry);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/717a7dc8/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java
index bcb6be4..a90b956 100644
--- 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java
@@ -60,6 +60,7 @@ public abstract class HAAbstractQueryableStateITCase extends 
AbstractQueryableSt
                        
config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 2);
                        
config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 2);
                        
config.setString(QueryableStateOptions.PROXY_PORT_RANGE, "9069-" + (9069 + 
NUM_TMS));
+                       
config.setString(QueryableStateOptions.SERVER_PORT_RANGE, "9062-" + (9062 + 
NUM_TMS));
                        
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
temporaryFolder.newFolder().toString());
                        
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
zkServer.getConnectString());
                        config.setString(HighAvailabilityOptions.HA_MODE, 
"zookeeper");

http://git-wip-us.apache.org/repos/asf/flink/blob/717a7dc8/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java
index 55f1841..c258e70 100644
--- 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java
@@ -50,6 +50,7 @@ public abstract class NonHAAbstractQueryableStateITCase 
extends AbstractQueryabl
                        config.setBoolean(QueryableStateOptions.SERVER_ENABLE, 
true);
                        
config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 1);
                        
config.setString(QueryableStateOptions.PROXY_PORT_RANGE, "9069-" + (9069 + 
NUM_TMS));
+                       
config.setString(QueryableStateOptions.SERVER_PORT_RANGE, "9062-" + (9062 + 
NUM_TMS));
 
                        cluster = new TestingCluster(config, false);
                        cluster.start(true);

http://git-wip-us.apache.org/repos/asf/flink/blob/717a7dc8/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
index 53b1592..4023925 100644
--- 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
@@ -622,7 +622,7 @@ public class ClientTest {
                                serverStats[i] = new 
AtomicKvStateRequestStats();
                                server[i] = new KvStateServerImpl(
                                                InetAddress.getLocalHost(),
-                                               0,
+                                               
Collections.singletonList(0).iterator(),
                                                numServerEventLoopThreads,
                                                numServerQueryThreads,
                                                registry[i],

http://git-wip-us.apache.org/repos/asf/flink/blob/717a7dc8/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
index 97e999d..217d0b5 100644
--- 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
@@ -63,6 +63,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.net.InetAddress;
+import java.util.Collections;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -87,7 +88,7 @@ public class KvStateServerHandlerTest extends TestLogger {
                try {
                        testServer = new KvStateServerImpl(
                                        InetAddress.getLocalHost(),
-                                       0,
+                                       Collections.singletonList(0).iterator(),
                                        1,
                                        1,
                                        new KvStateRegistry(),
@@ -382,7 +383,7 @@ public class KvStateServerHandlerTest extends TestLogger {
 
                KvStateServerImpl localTestServer = new KvStateServerImpl(
                                InetAddress.getLocalHost(),
-                               0,
+                               Collections.singletonList(0).iterator(),
                                1,
                                1,
                                new KvStateRegistry(),

http://git-wip-us.apache.org/repos/asf/flink/blob/717a7dc8/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
index 16f80c6..7abc84e 100644
--- 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
@@ -58,6 +58,7 @@ import org.junit.AfterClass;
 import org.junit.Test;
 
 import java.net.InetAddress;
+import java.util.Collections;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -93,7 +94,8 @@ public class KvStateServerTest {
                        KvStateRegistry registry = new KvStateRegistry();
                        KvStateRequestStats stats = new 
AtomicKvStateRequestStats();
 
-                       server = new 
KvStateServerImpl(InetAddress.getLocalHost(), 0, 1, 1, registry, stats);
+                       server = new 
KvStateServerImpl(InetAddress.getLocalHost(),
+                                       
Collections.singletonList(0).iterator(), 1, 1, registry, stats);
                        server.start();
 
                        KvStateServerAddress serverAddress = 
server.getServerAddress();

http://git-wip-us.apache.org/repos/asf/flink/blob/717a7dc8/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java
index f6a8627..fa021df 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java
@@ -91,7 +91,9 @@ public final class QueryableStateUtils {
         * requested internal state to the {@link KvStateClientProxy client 
proxy}.
         *
         * @param address the address to bind to.
-        * @param port the port to listen to.
+        * @param ports the range of ports the state server will attempt to 
listen to
+        *                 (see {@link 
org.apache.flink.configuration.QueryableStateOptions#SERVER_PORT_RANGE
+        *                 QueryableStateOptions.SERVER_PORT_RANGE}).
         * @param eventLoopThreads the number of threads to be used to process 
incoming requests.
         * @param queryThreads the number of threads to be used to send the 
actual state.
         * @param kvStateRegistry the registry with the queryable state.
@@ -100,7 +102,7 @@ public final class QueryableStateUtils {
         */
        public static KvStateServer createKvStateServer(
                        final InetAddress address,
-                       final int port,
+                       final Iterator<Integer> ports,
                        final int eventLoopThreads,
                        final int queryThreads,
                        final KvStateRegistry kvStateRegistry,
@@ -118,12 +120,12 @@ public final class QueryableStateUtils {
                        Class<? extends KvStateServer> clazz = 
Class.forName(classname).asSubclass(KvStateServer.class);
                        Constructor<? extends KvStateServer> constructor = 
clazz.getConstructor(
                                        InetAddress.class,
-                                       Integer.class,
+                                       Iterator.class,
                                        Integer.class,
                                        Integer.class,
                                        KvStateRegistry.class,
                                        KvStateRequestStats.class);
-                       return constructor.newInstance(address, port, 
eventLoopThreads, queryThreads, kvStateRegistry, stats);
+                       return constructor.newInstance(address, ports, 
eventLoopThreads, queryThreads, kvStateRegistry, stats);
                } catch (ClassNotFoundException e) {
                        LOG.warn("Could not load Queryable State Server. " +
                                        "Probable reason: flink-queryable-state 
is not in the classpath");

http://git-wip-us.apache.org/repos/asf/flink/blob/717a7dc8/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java
index 0c3ef0e..5e6b7c5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java
@@ -31,17 +31,27 @@ public class QueryableStateConfiguration {
 
        private final Iterator<Integer> proxyPortRange;
 
+       private final Iterator<Integer> qserverPortRange;
+
        private final int numServerThreads;
 
        private final int numQueryThreads;
 
-       public QueryableStateConfiguration(boolean enabled, Iterator<Integer> 
proxyPortRange, int numServerThreads, int numQueryThreads) {
+       public QueryableStateConfiguration(
+                       boolean enabled,
+                       Iterator<Integer> proxyPortRange,
+                       Iterator<Integer> qserverPortRange,
+                       int numServerThreads,
+                       int numQueryThreads) {
+
                checkArgument(!enabled || (proxyPortRange != null && 
proxyPortRange.hasNext()));
+               checkArgument(!enabled || (qserverPortRange != null && 
qserverPortRange.hasNext()));
                checkArgument(numServerThreads >= 0, "queryable state number of 
server threads must be zero or larger");
                checkArgument(numQueryThreads >= 0, "queryable state number of 
query threads must be zero or larger");
 
                this.enabled = enabled;
                this.proxyPortRange = proxyPortRange;
+               this.qserverPortRange = qserverPortRange;
                this.numServerThreads = numServerThreads;
                this.numQueryThreads = numQueryThreads;
        }
@@ -59,11 +69,19 @@ public class QueryableStateConfiguration {
         * Returns the port range where the queryable state client proxy can 
listen.
         * See {@link 
org.apache.flink.configuration.QueryableStateOptions#PROXY_PORT_RANGE 
QueryableStateOptions.PROXY_PORT_RANGE}.
         */
-       public Iterator<Integer> ports() {
+       public Iterator<Integer> getProxyPortRange() {
                return proxyPortRange;
        }
 
        /**
+        * Returns the port range where the queryable state client proxy can 
listen.
+        * See {@link 
org.apache.flink.configuration.QueryableStateOptions#SERVER_PORT_RANGE 
QueryableStateOptions.SERVER_PORT_RANGE}.
+        */
+       public Iterator<Integer> getStateServerPortRange() {
+               return qserverPortRange;
+       }
+
+       /**
         * Returns the number of threads for the query server NIO event loop.
         * These threads only process network events and dispatch query 
requests to the query threads.
         */
@@ -96,6 +114,6 @@ public class QueryableStateConfiguration {
         * Gets the configuration describing the queryable state as deactivated.
         */
        public static QueryableStateConfiguration disabled() {
-               return new QueryableStateConfiguration(false, null, 0, 0);
+               return new QueryableStateConfiguration(false, null, null, 0, 0);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/717a7dc8/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 312622b..1cc94d2a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -371,14 +371,14 @@ public class TaskManagerServices {
 
                        kvClientProxy = 
QueryableStateUtils.createKvStateClientProxy(
                                        
taskManagerServicesConfiguration.getTaskManagerAddress(),
-                                       qsConfig.ports(),
+                                       qsConfig.getProxyPortRange(),
                                        numNetworkThreads,
                                        numQueryThreads,
                                        new DisabledKvStateRequestStats());
 
                        kvStateServer = QueryableStateUtils.createKvStateServer(
                                        
taskManagerServicesConfiguration.getTaskManagerAddress(),
-                                       0,
+                                       qsConfig.getStateServerPortRange(),
                                        numNetworkThreads,
                                        numQueryThreads,
                                        kvStateRegistry,

http://git-wip-us.apache.org/repos/asf/flink/blob/717a7dc8/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index 31bfeff..bfd37bc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -416,11 +416,16 @@ public class TaskManagerServicesConfiguration {
                final boolean enabled = 
config.getBoolean(QueryableStateOptions.SERVER_ENABLE);
 
                if (enabled) {
-                       final Iterator<Integer> ports = 
NetUtils.getPortRangeFromString(
-                                       
config.getString(QueryableStateOptions.PROXY_PORT_RANGE, "9069"));
+                       final Iterator<Integer> proxyPorts = 
NetUtils.getPortRangeFromString(
+                                       
config.getString(QueryableStateOptions.PROXY_PORT_RANGE,
+                                                       
QueryableStateOptions.PROXY_PORT_RANGE.defaultValue()));
+                       final Iterator<Integer> serverPorts = 
NetUtils.getPortRangeFromString(
+                                       
config.getString(QueryableStateOptions.SERVER_PORT_RANGE,
+                                                       
QueryableStateOptions.SERVER_PORT_RANGE.defaultValue()));
+
                        final int numNetworkThreads = 
config.getInteger(QueryableStateOptions.SERVER_NETWORK_THREADS);
                        final int numQueryThreads = 
config.getInteger(QueryableStateOptions.SERVER_ASYNC_QUERY_THREADS);
-                       return new QueryableStateConfiguration(true, ports, 
numNetworkThreads, numQueryThreads);
+                       return new QueryableStateConfiguration(true, 
proxyPorts, serverPorts, numNetworkThreads, numQueryThreads);
                }
                else {
                        return QueryableStateConfiguration.disabled();

Reply via email to