Repository: flink Updated Branches: refs/heads/master 9772e03d7 -> 717a7dc81
[FLINK-5920][QS] Allow specification of port range for queryable state server. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/717a7dc8 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/717a7dc8 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/717a7dc8 Branch: refs/heads/master Commit: 717a7dc81d066dc7d6e1a17099c0f5e1bc96b5d1 Parents: 5338f85 Author: kkloudas <kklou...@gmail.com> Authored: Fri Oct 13 19:15:11 2017 +0200 Committer: kkloudas <kklou...@gmail.com> Committed: Wed Oct 18 08:40:28 2017 +0200 ---------------------------------------------------------------------- .../configuration/QueryableStateOptions.java | 26 ++++++++++++++++++-- .../client/proxy/KvStateClientProxyImpl.java | 2 +- .../network/AbstractServerBase.java | 25 ------------------- .../server/KvStateServerImpl.java | 11 +++++---- .../itcases/HAAbstractQueryableStateITCase.java | 1 + .../NonHAAbstractQueryableStateITCase.java | 1 + .../queryablestate/network/ClientTest.java | 2 +- .../network/KvStateServerHandlerTest.java | 5 ++-- .../network/KvStateServerTest.java | 4 ++- .../runtime/query/QueryableStateUtils.java | 10 +++++--- .../QueryableStateConfiguration.java | 24 +++++++++++++++--- .../taskexecutor/TaskManagerServices.java | 4 +-- .../TaskManagerServicesConfiguration.java | 11 ++++++--- 13 files changed, 77 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/717a7dc8/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java index 2dd4cca..adba938 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java @@ -44,7 +44,7 @@ public class QueryableStateOptions { * machine. * * <p>Given this, and to avoid port clashes, the user can specify a port range and - * the proxy is going to bind to the first free port in that range. + * the proxy will bind to the first free port in that range. * * <p>The specified range can be: * <ol> @@ -56,9 +56,31 @@ public class QueryableStateOptions { * <p><b>The default port is 9069.</b> */ public static final ConfigOption<String> PROXY_PORT_RANGE = - key("query.server.port") + key("query.proxy.ports") .defaultValue("9069"); + /** + * The config parameter defining the server port range of the queryable state server. + * + * <p>A state server runs on each Task Manager, so many server may run on the same + * machine. + * + * <p>Given this, and to avoid port clashes, the user can specify a port range and + * the server will bind to the first free port in that range. + * + * <p>The specified range can be: + * <ol> + * <li>a port: "9123", + * <li>a range of ports: "50100-50200", or + * <li>a list of ranges and or points: "50100-50200,50300-50400,51234" + * </ol> + * + * <p><b>The default port is 9067.</b> + */ + public static final ConfigOption<String> SERVER_PORT_RANGE = + key("query.server.ports") + .defaultValue("9067"); + /** Number of network (event loop) threads for the KvState server (0 => #slots). */ public static final ConfigOption<Integer> SERVER_NETWORK_THREADS = key("query.server.network-threads") http://git-wip-us.apache.org/repos/asf/flink/blob/717a7dc8/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java index 2e0c287..196641d 100644 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java @@ -66,7 +66,7 @@ public class KvStateClientProxyImpl extends AbstractServerBase<KvStateRequest, K * to the configured bind address. * * @param bindAddress the address to listen to. - * @param bindPortIterator the port to listen to. + * @param bindPortIterator the port range to try to bind to. * @param numEventLoopThreads number of event loop threads. * @param numQueryThreads number of query threads. * @param stats the statistics collector. http://git-wip-us.apache.org/repos/asf/flink/blob/717a7dc8/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java index 8df42f7..be852fb 100644 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java @@ -104,31 +104,6 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M * * @param serverName the name of the server * @param bindAddress address to bind to - * @param bindPort port to bind to - * @param numEventLoopThreads number of event loop threads - */ - protected AbstractServerBase( - final String serverName, - final InetAddress bindAddress, - final Integer bindPort, - final Integer numEventLoopThreads, - final Integer numQueryThreads) { - this( - serverName, - bindAddress, - Collections.singleton(bindPort).iterator(), - numEventLoopThreads, - numQueryThreads - ); - } - - /** - * Creates the {@link AbstractServerBase}. - * - * <p>The server needs to be started via {@link #start()}. - * - * @param serverName the name of the server - * @param bindAddress address to bind to * @param bindPortIterator port to bind to * @param numEventLoopThreads number of event loop threads */ http://git-wip-us.apache.org/repos/asf/flink/blob/717a7dc8/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java index 1673015..dfca915 100644 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java @@ -34,6 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.InetAddress; +import java.util.Iterator; /** * The default implementation of the {@link KvStateServer}. @@ -54,14 +55,14 @@ public class KvStateServerImpl extends AbstractServerBase<KvStateInternalRequest * Creates the state server. * * <p>The server is instantiated using reflection by the - * {@link org.apache.flink.runtime.query.QueryableStateUtils#createKvStateServer(InetAddress, int, int, int, KvStateRegistry, KvStateRequestStats) - * QueryableStateUtils.createKvStateServer(InetAddress, int, int, int, KvStateRegistry, KvStateRequestStats)}. + * {@link org.apache.flink.runtime.query.QueryableStateUtils#createKvStateServer(InetAddress, Iterator, int, int, KvStateRegistry, KvStateRequestStats) + * QueryableStateUtils.createKvStateServer(InetAddress, Iterator, int, int, KvStateRegistry, KvStateRequestStats)}. * * <p>The server needs to be started via {@link #start()} in order to bind * to the configured bind address. * * @param bindAddress the address to listen to. - * @param bindPort the port to listen to. + * @param bindPortIterator the port range to try to bind to. * @param numEventLoopThreads number of event loop threads. * @param numQueryThreads number of query threads. * @param kvStateRegistry {@link KvStateRegistry} to query for state instances. @@ -69,13 +70,13 @@ public class KvStateServerImpl extends AbstractServerBase<KvStateInternalRequest */ public KvStateServerImpl( final InetAddress bindAddress, - final Integer bindPort, + final Iterator<Integer> bindPortIterator, final Integer numEventLoopThreads, final Integer numQueryThreads, final KvStateRegistry kvStateRegistry, final KvStateRequestStats stats) { - super("Queryable State Server", bindAddress, bindPort, numEventLoopThreads, numQueryThreads); + super("Queryable State Server", bindAddress, bindPortIterator, numEventLoopThreads, numQueryThreads); this.stats = Preconditions.checkNotNull(stats); this.kvStateRegistry = Preconditions.checkNotNull(kvStateRegistry); } http://git-wip-us.apache.org/repos/asf/flink/blob/717a7dc8/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java index bcb6be4..a90b956 100644 --- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java +++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java @@ -60,6 +60,7 @@ public abstract class HAAbstractQueryableStateITCase extends AbstractQueryableSt config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 2); config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 2); config.setString(QueryableStateOptions.PROXY_PORT_RANGE, "9069-" + (9069 + NUM_TMS)); + config.setString(QueryableStateOptions.SERVER_PORT_RANGE, "9062-" + (9062 + NUM_TMS)); config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().toString()); config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString()); config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); http://git-wip-us.apache.org/repos/asf/flink/blob/717a7dc8/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java index 55f1841..c258e70 100644 --- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java +++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java @@ -50,6 +50,7 @@ public abstract class NonHAAbstractQueryableStateITCase extends AbstractQueryabl config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true); config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 1); config.setString(QueryableStateOptions.PROXY_PORT_RANGE, "9069-" + (9069 + NUM_TMS)); + config.setString(QueryableStateOptions.SERVER_PORT_RANGE, "9062-" + (9062 + NUM_TMS)); cluster = new TestingCluster(config, false); cluster.start(true); http://git-wip-us.apache.org/repos/asf/flink/blob/717a7dc8/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java index 53b1592..4023925 100644 --- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java +++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java @@ -622,7 +622,7 @@ public class ClientTest { serverStats[i] = new AtomicKvStateRequestStats(); server[i] = new KvStateServerImpl( InetAddress.getLocalHost(), - 0, + Collections.singletonList(0).iterator(), numServerEventLoopThreads, numServerQueryThreads, registry[i], http://git-wip-us.apache.org/repos/asf/flink/blob/717a7dc8/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java index 97e999d..217d0b5 100644 --- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java +++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java @@ -63,6 +63,7 @@ import org.junit.BeforeClass; import org.junit.Test; import java.net.InetAddress; +import java.util.Collections; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -87,7 +88,7 @@ public class KvStateServerHandlerTest extends TestLogger { try { testServer = new KvStateServerImpl( InetAddress.getLocalHost(), - 0, + Collections.singletonList(0).iterator(), 1, 1, new KvStateRegistry(), @@ -382,7 +383,7 @@ public class KvStateServerHandlerTest extends TestLogger { KvStateServerImpl localTestServer = new KvStateServerImpl( InetAddress.getLocalHost(), - 0, + Collections.singletonList(0).iterator(), 1, 1, new KvStateRegistry(), http://git-wip-us.apache.org/repos/asf/flink/blob/717a7dc8/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java index 16f80c6..7abc84e 100644 --- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java +++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java @@ -58,6 +58,7 @@ import org.junit.AfterClass; import org.junit.Test; import java.net.InetAddress; +import java.util.Collections; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -93,7 +94,8 @@ public class KvStateServerTest { KvStateRegistry registry = new KvStateRegistry(); KvStateRequestStats stats = new AtomicKvStateRequestStats(); - server = new KvStateServerImpl(InetAddress.getLocalHost(), 0, 1, 1, registry, stats); + server = new KvStateServerImpl(InetAddress.getLocalHost(), + Collections.singletonList(0).iterator(), 1, 1, registry, stats); server.start(); KvStateServerAddress serverAddress = server.getServerAddress(); http://git-wip-us.apache.org/repos/asf/flink/blob/717a7dc8/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java index f6a8627..fa021df 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java @@ -91,7 +91,9 @@ public final class QueryableStateUtils { * requested internal state to the {@link KvStateClientProxy client proxy}. * * @param address the address to bind to. - * @param port the port to listen to. + * @param ports the range of ports the state server will attempt to listen to + * (see {@link org.apache.flink.configuration.QueryableStateOptions#SERVER_PORT_RANGE + * QueryableStateOptions.SERVER_PORT_RANGE}). * @param eventLoopThreads the number of threads to be used to process incoming requests. * @param queryThreads the number of threads to be used to send the actual state. * @param kvStateRegistry the registry with the queryable state. @@ -100,7 +102,7 @@ public final class QueryableStateUtils { */ public static KvStateServer createKvStateServer( final InetAddress address, - final int port, + final Iterator<Integer> ports, final int eventLoopThreads, final int queryThreads, final KvStateRegistry kvStateRegistry, @@ -118,12 +120,12 @@ public final class QueryableStateUtils { Class<? extends KvStateServer> clazz = Class.forName(classname).asSubclass(KvStateServer.class); Constructor<? extends KvStateServer> constructor = clazz.getConstructor( InetAddress.class, - Integer.class, + Iterator.class, Integer.class, Integer.class, KvStateRegistry.class, KvStateRequestStats.class); - return constructor.newInstance(address, port, eventLoopThreads, queryThreads, kvStateRegistry, stats); + return constructor.newInstance(address, ports, eventLoopThreads, queryThreads, kvStateRegistry, stats); } catch (ClassNotFoundException e) { LOG.warn("Could not load Queryable State Server. " + "Probable reason: flink-queryable-state is not in the classpath"); http://git-wip-us.apache.org/repos/asf/flink/blob/717a7dc8/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java index 0c3ef0e..5e6b7c5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java @@ -31,17 +31,27 @@ public class QueryableStateConfiguration { private final Iterator<Integer> proxyPortRange; + private final Iterator<Integer> qserverPortRange; + private final int numServerThreads; private final int numQueryThreads; - public QueryableStateConfiguration(boolean enabled, Iterator<Integer> proxyPortRange, int numServerThreads, int numQueryThreads) { + public QueryableStateConfiguration( + boolean enabled, + Iterator<Integer> proxyPortRange, + Iterator<Integer> qserverPortRange, + int numServerThreads, + int numQueryThreads) { + checkArgument(!enabled || (proxyPortRange != null && proxyPortRange.hasNext())); + checkArgument(!enabled || (qserverPortRange != null && qserverPortRange.hasNext())); checkArgument(numServerThreads >= 0, "queryable state number of server threads must be zero or larger"); checkArgument(numQueryThreads >= 0, "queryable state number of query threads must be zero or larger"); this.enabled = enabled; this.proxyPortRange = proxyPortRange; + this.qserverPortRange = qserverPortRange; this.numServerThreads = numServerThreads; this.numQueryThreads = numQueryThreads; } @@ -59,11 +69,19 @@ public class QueryableStateConfiguration { * Returns the port range where the queryable state client proxy can listen. * See {@link org.apache.flink.configuration.QueryableStateOptions#PROXY_PORT_RANGE QueryableStateOptions.PROXY_PORT_RANGE}. */ - public Iterator<Integer> ports() { + public Iterator<Integer> getProxyPortRange() { return proxyPortRange; } /** + * Returns the port range where the queryable state client proxy can listen. + * See {@link org.apache.flink.configuration.QueryableStateOptions#SERVER_PORT_RANGE QueryableStateOptions.SERVER_PORT_RANGE}. + */ + public Iterator<Integer> getStateServerPortRange() { + return qserverPortRange; + } + + /** * Returns the number of threads for the query server NIO event loop. * These threads only process network events and dispatch query requests to the query threads. */ @@ -96,6 +114,6 @@ public class QueryableStateConfiguration { * Gets the configuration describing the queryable state as deactivated. */ public static QueryableStateConfiguration disabled() { - return new QueryableStateConfiguration(false, null, 0, 0); + return new QueryableStateConfiguration(false, null, null, 0, 0); } } http://git-wip-us.apache.org/repos/asf/flink/blob/717a7dc8/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java index 312622b..1cc94d2a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java @@ -371,14 +371,14 @@ public class TaskManagerServices { kvClientProxy = QueryableStateUtils.createKvStateClientProxy( taskManagerServicesConfiguration.getTaskManagerAddress(), - qsConfig.ports(), + qsConfig.getProxyPortRange(), numNetworkThreads, numQueryThreads, new DisabledKvStateRequestStats()); kvStateServer = QueryableStateUtils.createKvStateServer( taskManagerServicesConfiguration.getTaskManagerAddress(), - 0, + qsConfig.getStateServerPortRange(), numNetworkThreads, numQueryThreads, kvStateRegistry, http://git-wip-us.apache.org/repos/asf/flink/blob/717a7dc8/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java index 31bfeff..bfd37bc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java @@ -416,11 +416,16 @@ public class TaskManagerServicesConfiguration { final boolean enabled = config.getBoolean(QueryableStateOptions.SERVER_ENABLE); if (enabled) { - final Iterator<Integer> ports = NetUtils.getPortRangeFromString( - config.getString(QueryableStateOptions.PROXY_PORT_RANGE, "9069")); + final Iterator<Integer> proxyPorts = NetUtils.getPortRangeFromString( + config.getString(QueryableStateOptions.PROXY_PORT_RANGE, + QueryableStateOptions.PROXY_PORT_RANGE.defaultValue())); + final Iterator<Integer> serverPorts = NetUtils.getPortRangeFromString( + config.getString(QueryableStateOptions.SERVER_PORT_RANGE, + QueryableStateOptions.SERVER_PORT_RANGE.defaultValue())); + final int numNetworkThreads = config.getInteger(QueryableStateOptions.SERVER_NETWORK_THREADS); final int numQueryThreads = config.getInteger(QueryableStateOptions.SERVER_ASYNC_QUERY_THREADS); - return new QueryableStateConfiguration(true, ports, numNetworkThreads, numQueryThreads); + return new QueryableStateConfiguration(true, proxyPorts, serverPorts, numNetworkThreads, numQueryThreads); } else { return QueryableStateConfiguration.disabled();