[FLINK-7788][QS] Allow specification of port range for queryable state proxy.
The queryable state client proxy can now take a port range as argument so that if multiple proxies run on one machine, they can all start without problems. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5338f856 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5338f856 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5338f856 Branch: refs/heads/master Commit: 5338f856b42422189246130cc245754162fa9913 Parents: 9772e03 Author: kkloudas <kklou...@gmail.com> Authored: Fri Oct 6 12:08:47 2017 +0200 Committer: kkloudas <kklou...@gmail.com> Committed: Wed Oct 18 08:40:28 2017 +0200 ---------------------------------------------------------------------- .../configuration/QueryableStateOptions.java | 25 ++- .../client/proxy/KvStateClientProxyImpl.java | 13 +- .../network/AbstractServerBase.java | 196 ++++++++++++----- .../server/KvStateServerImpl.java | 4 +- .../itcases/AbstractQueryableStateITCase.java | 18 +- .../itcases/HAAbstractQueryableStateITCase.java | 3 +- .../NonHAAbstractQueryableStateITCase.java | 4 +- .../network/AbstractServerTest.java | 219 +++++++++++++++++++ .../queryablestate/network/ClientTest.java | 2 +- .../network/KvStateServerHandlerTest.java | 7 +- .../network/KvStateServerTest.java | 2 +- .../runtime/io/network/NetworkEnvironment.java | 12 +- .../flink/runtime/query/KvStateServer.java | 2 +- .../runtime/query/QueryableStateUtils.java | 11 +- .../QueryableStateConfiguration.java | 22 +- .../taskexecutor/TaskManagerServices.java | 4 +- .../TaskManagerServicesConfiguration.java | 12 +- 17 files changed, 450 insertions(+), 106 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5338f856/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java index df850e9..2dd4cca 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java @@ -37,10 +37,27 @@ public class QueryableStateOptions { key("query.server.enable") .defaultValue(true); - /** Port to bind KvState server to (0 => pick random available port). */ - public static final ConfigOption<Integer> SERVER_PORT = + /** + * The config parameter defining the server port range of the queryable state proxy. + * + * <p>A proxy runs on each Task Manager, so many proxies may run on the same + * machine. + * + * <p>Given this, and to avoid port clashes, the user can specify a port range and + * the proxy is going to bind to the first free port in that range. + * + * <p>The specified range can be: + * <ol> + * <li>a port: "9123", + * <li>a range of ports: "50100-50200", or + * <li>a list of ranges and or points: "50100-50200,50300-50400,51234" + * </ol> + * + * <p><b>The default port is 9069.</b> + */ + public static final ConfigOption<String> PROXY_PORT_RANGE = key("query.server.port") - .defaultValue(9069); + .defaultValue("9069"); /** Number of network (event loop) threads for the KvState server (0 => #slots). */ public static final ConfigOption<Integer> SERVER_NETWORK_THREADS = @@ -73,7 +90,7 @@ public class QueryableStateOptions { // ------------------------------------------------------------------------ - /** Not intended to be instantiated */ + /** Not intended to be instantiated. */ private QueryableStateOptions() { } } http://git-wip-us.apache.org/repos/asf/flink/blob/5338f856/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java index bca80de..2e0c287 100644 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.query.netty.KvStateRequestStats; import org.apache.flink.util.Preconditions; import java.net.InetAddress; +import java.util.Iterator; import java.util.concurrent.CompletableFuture; /** @@ -58,26 +59,26 @@ public class KvStateClientProxyImpl extends AbstractServerBase<KvStateRequest, K * Creates the Queryable State Client Proxy. * * <p>The server is instantiated using reflection by the - * {@link org.apache.flink.runtime.query.QueryableStateUtils#createKvStateClientProxy(InetAddress, int, int, int, KvStateRequestStats) - * QueryableStateUtils.startKvStateClientProxy(InetAddress, int, int, int, KvStateRequestStats)}. + * {@link org.apache.flink.runtime.query.QueryableStateUtils#createKvStateClientProxy(InetAddress, Iterator, int, int, KvStateRequestStats) + * QueryableStateUtils.createKvStateClientProxy(InetAddress, Iterator, int, int, KvStateRequestStats)}. * * <p>The server needs to be started via {@link #start()} in order to bind * to the configured bind address. * * @param bindAddress the address to listen to. - * @param bindPort the port to listen to. + * @param bindPortIterator the port to listen to. * @param numEventLoopThreads number of event loop threads. * @param numQueryThreads number of query threads. * @param stats the statistics collector. */ public KvStateClientProxyImpl( final InetAddress bindAddress, - final Integer bindPort, + final Iterator<Integer> bindPortIterator, final Integer numEventLoopThreads, final Integer numQueryThreads, final KvStateRequestStats stats) { - super("Queryable State Proxy Server", bindAddress, bindPort, numEventLoopThreads, numQueryThreads); + super("Queryable State Proxy Server", bindAddress, bindPortIterator, numEventLoopThreads, numQueryThreads); Preconditions.checkArgument(numQueryThreads >= 1, "Non-positive number of query threads."); this.queryExecutorThreads = numQueryThreads; this.stats = Preconditions.checkNotNull(stats); @@ -89,7 +90,7 @@ public class KvStateClientProxyImpl extends AbstractServerBase<KvStateRequest, K } @Override - public void start() throws InterruptedException { + public void start() throws Throwable { super.start(); } http://git-wip-us.apache.org/repos/asf/flink/blob/5338f856/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java index 4bf8e98..8df42f7 100644 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java @@ -23,11 +23,12 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.queryablestate.network.messages.MessageBody; import org.apache.flink.runtime.io.network.netty.NettyBufferPool; import org.apache.flink.runtime.query.KvStateServerAddress; +import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap; -import org.apache.flink.shaded.netty4.io.netty.channel.Channel; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption; import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup; @@ -40,8 +41,12 @@ import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandle import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.net.BindException; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; @@ -65,13 +70,26 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M /** AbstractServerBase config: high water mark. */ private static final int HIGH_WATER_MARK = 32 * 1024; + /** The name of the server, useful for debugging. */ private final String serverName; + /** The {@link InetAddress address} to listen to. */ + private final InetAddress bindAddress; + + /** A port range on which to try to connect. */ + private final Set<Integer> bindPortRange; + + /** The number of threads to be allocated to the event loop. */ + private final int numEventLoopThreads; + + /** The number of threads to be used for query serving. */ + private final int numQueryThreads; + /** Netty's ServerBootstrap. */ - private final ServerBootstrap bootstrap; + private ServerBootstrap bootstrap; /** Query executor thread pool. */ - private final ExecutorService queryExecutor; + private ExecutorService queryExecutor; /** Address of this server. */ private KvStateServerAddress serverAddress; @@ -82,12 +100,11 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M /** * Creates the {@link AbstractServerBase}. * - * <p>The server needs to be started via {@link #start()} in order to bind - * to the configured bind address. + * <p>The server needs to be started via {@link #start()}. * * @param serverName the name of the server * @param bindAddress address to bind to - * @param bindPort port to bind to (random port if 0) + * @param bindPort port to bind to * @param numEventLoopThreads number of event loop threads */ protected AbstractServerBase( @@ -96,90 +113,166 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M final Integer bindPort, final Integer numEventLoopThreads, final Integer numQueryThreads) { + this( + serverName, + bindAddress, + Collections.singleton(bindPort).iterator(), + numEventLoopThreads, + numQueryThreads + ); + } + + /** + * Creates the {@link AbstractServerBase}. + * + * <p>The server needs to be started via {@link #start()}. + * + * @param serverName the name of the server + * @param bindAddress address to bind to + * @param bindPortIterator port to bind to + * @param numEventLoopThreads number of event loop threads + */ + protected AbstractServerBase( + final String serverName, + final InetAddress bindAddress, + final Iterator<Integer> bindPortIterator, + final Integer numEventLoopThreads, + final Integer numQueryThreads) { - Preconditions.checkNotNull(bindAddress); - Preconditions.checkArgument(bindPort >= 0 && bindPort <= 65536, "Port " + bindPort + " out of valid range (0-65536)."); + Preconditions.checkNotNull(bindPortIterator); Preconditions.checkArgument(numEventLoopThreads >= 1, "Non-positive number of event loop threads."); Preconditions.checkArgument(numQueryThreads >= 1, "Non-positive number of query threads."); this.serverName = Preconditions.checkNotNull(serverName); - this.queryExecutor = createQueryExecutor(numQueryThreads); - - final NettyBufferPool bufferPool = new NettyBufferPool(numEventLoopThreads); - - final ThreadFactory threadFactory = new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("Flink " + serverName + " EventLoop Thread %d") - .build(); - - final NioEventLoopGroup nioGroup = new NioEventLoopGroup(numEventLoopThreads, threadFactory); - - bootstrap = new ServerBootstrap() - // Bind address and port - .localAddress(bindAddress, bindPort) - // NIO server channels - .group(nioGroup) - .channel(NioServerSocketChannel.class) - // AbstractServerBase channel Options - .option(ChannelOption.ALLOCATOR, bufferPool) - // Child channel options - .childOption(ChannelOption.ALLOCATOR, bufferPool) - .childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, HIGH_WATER_MARK) - .childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, LOW_WATER_MARK); + this.bindAddress = Preconditions.checkNotNull(bindAddress); + this.numEventLoopThreads = numEventLoopThreads; + this.numQueryThreads = numQueryThreads; + + this.bindPortRange = new HashSet<>(); + while (bindPortIterator.hasNext()) { + int port = bindPortIterator.next(); + Preconditions.checkArgument(port >= 0 && port <= 65535, + "Invalid port configuration. Port must be between 0 and 65535, but was " + port + "."); + bindPortRange.add(port); + } } /** * Creates a thread pool for the query execution. - * - * @param numQueryThreads Number of query threads. * @return Thread pool for query execution */ - private ExecutorService createQueryExecutor(int numQueryThreads) { + private ExecutorService createQueryExecutor() { ThreadFactory threadFactory = new ThreadFactoryBuilder() .setDaemon(true) .setNameFormat("Flink " + getServerName() + " Thread %d") .build(); - return Executors.newFixedThreadPool(numQueryThreads, threadFactory); } + /** + * Returns the thread-pool responsible for processing incoming requests. + */ protected ExecutorService getQueryExecutor() { return queryExecutor; } + /** + * Gets the name of the server. This is useful for debugging. + * @return The name of the server. + */ public String getServerName() { return serverName; } + /** + * Returns the {@link AbstractServerHandler handler} to be used for + * serving the incoming requests. + */ public abstract AbstractServerHandler<REQ, RESP> initializeHandler(); /** + * Returns the address of this server. + * + * @return AbstractServerBase address + * @throws IllegalStateException If server has not been started yet + */ + public KvStateServerAddress getServerAddress() { + Preconditions.checkState(serverAddress != null, "Server " + serverName + " has not been started."); + return serverAddress; + } + + /** * Starts the server by binding to the configured bind address (blocking). - * @throws InterruptedException If interrupted during the bind operation + * @throws Exception If something goes wrong during the bind operation. */ - public void start() throws InterruptedException { + public void start() throws Throwable { Preconditions.checkState(serverAddress == null, - "Server " + serverName + " has already been started @ " + serverAddress + '.'); + "Server " + serverName + " already running @ " + serverAddress + '.'); - this.handler = initializeHandler(); - bootstrap.childHandler(new ServerChannelInitializer<>(handler)); - - Channel channel = bootstrap.bind().sync().channel(); - InetSocketAddress localAddress = (InetSocketAddress) channel.localAddress(); - serverAddress = new KvStateServerAddress(localAddress.getAddress(), localAddress.getPort()); + Iterator<Integer> portIterator = bindPortRange.iterator(); + while (portIterator.hasNext() && !attemptToBind(portIterator.next())) {} - LOG.info("Started server {} @ {}", serverName, serverAddress); + if (serverAddress != null) { + LOG.info("Started server {} @ {}.", serverName, serverAddress); + } else { + LOG.info("Unable to start server {}. All ports in provided range are occupied.", serverName); + throw new FlinkRuntimeException("Unable to start server " + serverName + ". All ports in provided range are occupied."); + } } /** - * Returns the address of this server. + * Tries to start the server at the provided port. * - * @return AbstractServerBase address - * @throws IllegalStateException If server has not been started yet + * <p>This, in conjunction with {@link #start()}, try to start the + * server on a free port among the port range provided at the constructor. + * + * @param port the port to try to bind the server to. + * @throws Exception If something goes wrong during the bind operation. */ - public KvStateServerAddress getServerAddress() { - Preconditions.checkState(serverAddress != null, "Server " + serverName + " has not been started."); - return serverAddress; + private boolean attemptToBind(final int port) throws Throwable { + LOG.debug("Attempting to start server {} on port {}.", serverName, port); + + this.queryExecutor = createQueryExecutor(); + this.handler = initializeHandler(); + + final NettyBufferPool bufferPool = new NettyBufferPool(numEventLoopThreads); + + final ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("Flink " + serverName + " EventLoop Thread %d") + .build(); + + final NioEventLoopGroup nioGroup = new NioEventLoopGroup(numEventLoopThreads, threadFactory); + + this.bootstrap = new ServerBootstrap() + .localAddress(bindAddress, port) + .group(nioGroup) + .channel(NioServerSocketChannel.class) + .option(ChannelOption.ALLOCATOR, bufferPool) + .childOption(ChannelOption.ALLOCATOR, bufferPool) + .childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, HIGH_WATER_MARK) + .childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, LOW_WATER_MARK) + .childHandler(new ServerChannelInitializer<>(handler)); + + try { + final ChannelFuture future = bootstrap.bind().sync(); + if (future.isSuccess()) { + final InetSocketAddress localAddress = (InetSocketAddress) future.channel().localAddress(); + serverAddress = new KvStateServerAddress(localAddress.getAddress(), localAddress.getPort()); + return true; + } + + // the following throw is to bypass Netty's "optimization magic" + // and catch the bind exception. + // the exception is thrown by the sync() call above. + + throw future.cause(); + } catch (BindException e) { + LOG.debug("Failed to start server {} on port {}: {}.", serverName, port, e.getMessage()); + shutdown(); + } + // any other type of exception we let it bubble up. + return false; } /** @@ -190,6 +283,7 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M if (handler != null) { handler.shutdown(); + handler = null; } if (queryExecutor != null) { http://git-wip-us.apache.org/repos/asf/flink/blob/5338f856/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java index b4c548a..1673015 100644 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java @@ -55,7 +55,7 @@ public class KvStateServerImpl extends AbstractServerBase<KvStateInternalRequest * * <p>The server is instantiated using reflection by the * {@link org.apache.flink.runtime.query.QueryableStateUtils#createKvStateServer(InetAddress, int, int, int, KvStateRegistry, KvStateRequestStats) - * QueryableStateUtils.startKvStateServer(InetAddress, int, int, int, KvStateRegistry, KvStateRequestStats)}. + * QueryableStateUtils.createKvStateServer(InetAddress, int, int, int, KvStateRegistry, KvStateRequestStats)}. * * <p>The server needs to be started via {@link #start()} in order to bind * to the configured bind address. @@ -94,7 +94,7 @@ public class KvStateServerImpl extends AbstractServerBase<KvStateInternalRequest } @Override - public void start() throws InterruptedException { + public void start() throws Throwable { super.start(); } http://git-wip-us.apache.org/repos/asf/flink/blob/5338f856/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateITCase.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateITCase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateITCase.java index 7ff4ec6..a096f55 100644 --- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateITCase.java +++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateITCase.java @@ -147,7 +147,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { final QueryableStateClient client = new QueryableStateClient( "localhost", - cluster.configuration().getInteger(QueryableStateOptions.SERVER_PORT)); + Integer.parseInt(QueryableStateOptions.PROXY_PORT_RANGE.defaultValue())); JobID jobId = null; @@ -371,7 +371,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { final QueryableStateClient client = new QueryableStateClient( "localhost", - cluster.configuration().getInteger(QueryableStateOptions.SERVER_PORT)); + Integer.parseInt(QueryableStateOptions.PROXY_PORT_RANGE.defaultValue())); JobID jobId = null; try { @@ -435,7 +435,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { final QueryableStateClient client = new QueryableStateClient( "localhost", - cluster.configuration().getInteger(QueryableStateOptions.SERVER_PORT)); + Integer.parseInt(QueryableStateOptions.PROXY_PORT_RANGE.defaultValue())); JobID jobId = null; try { @@ -600,7 +600,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { final QueryableStateClient client = new QueryableStateClient( "localhost", - cluster.configuration().getInteger(QueryableStateOptions.SERVER_PORT)); + Integer.parseInt(QueryableStateOptions.PROXY_PORT_RANGE.defaultValue())); JobID jobId = null; try { @@ -698,7 +698,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { final QueryableStateClient client = new QueryableStateClient( "localhost", - cluster.configuration().getInteger(QueryableStateOptions.SERVER_PORT)); + Integer.parseInt(QueryableStateOptions.PROXY_PORT_RANGE.defaultValue())); JobID jobId = null; try { @@ -764,7 +764,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { final QueryableStateClient client = new QueryableStateClient( "localhost", - cluster.configuration().getInteger(QueryableStateOptions.SERVER_PORT)); + Integer.parseInt(QueryableStateOptions.PROXY_PORT_RANGE.defaultValue())); JobID jobId = null; try { @@ -774,7 +774,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { // Very important, because cluster is shared between tests and we // don't explicitly check that all slots are available before // submitting. - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000)); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); DataStream<Tuple2<Integer, Long>> source = env .addSource(new TestAscendingValueSource(numElements)); @@ -861,7 +861,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { final QueryableStateClient client = new QueryableStateClient( "localhost", - cluster.configuration().getInteger(QueryableStateOptions.SERVER_PORT)); + Integer.parseInt(QueryableStateOptions.PROXY_PORT_RANGE.defaultValue())); JobID jobId = null; try { @@ -871,7 +871,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { // Very important, because cluster is shared between tests and we // don't explicitly check that all slots are available before // submitting. - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000)); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); DataStream<Tuple2<Integer, Long>> source = env .addSource(new TestAscendingValueSource(numElements)); http://git-wip-us.apache.org/repos/asf/flink/blob/5338f856/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java index a2a9678..bcb6be4 100644 --- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java +++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java @@ -39,7 +39,7 @@ import static org.junit.Assert.fail; public abstract class HAAbstractQueryableStateITCase extends AbstractQueryableStateITCase { private static final int NUM_JMS = 2; - private static final int NUM_TMS = 1; + private static final int NUM_TMS = 2; private static final int NUM_SLOTS_PER_TM = 4; private static TestingServer zkServer; @@ -59,6 +59,7 @@ public abstract class HAAbstractQueryableStateITCase extends AbstractQueryableSt config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true); config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 2); config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 2); + config.setString(QueryableStateOptions.PROXY_PORT_RANGE, "9069-" + (9069 + NUM_TMS)); config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().toString()); config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString()); config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); http://git-wip-us.apache.org/repos/asf/flink/blob/5338f856/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java index 1173d0d..55f1841 100644 --- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java +++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java @@ -36,7 +36,7 @@ import static org.junit.Assert.fail; */ public abstract class NonHAAbstractQueryableStateITCase extends AbstractQueryableStateITCase { - private static final int NUM_TMS = 1; + private static final int NUM_TMS = 2; private static final int NUM_SLOTS_PER_TM = 4; @BeforeClass @@ -47,9 +47,9 @@ public abstract class NonHAAbstractQueryableStateITCase extends AbstractQueryabl config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM); config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1); - config.setInteger(QueryableStateOptions.SERVER_PORT, 9069); config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true); config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 1); + config.setString(QueryableStateOptions.PROXY_PORT_RANGE, "9069-" + (9069 + NUM_TMS)); cluster = new TestingCluster(config, false); cluster.start(true); http://git-wip-us.apache.org/repos/asf/flink/blob/5338f856/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java new file mode 100644 index 0000000..1fd7012 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.network; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.queryablestate.network.messages.MessageBody; +import org.apache.flink.queryablestate.network.messages.MessageDeserializer; +import org.apache.flink.queryablestate.network.messages.MessageSerializer; +import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * Tests general behavior of the {@link AbstractServerBase}. + */ +public class AbstractServerTest { + + @Rule + public ExpectedException expectedEx = ExpectedException.none(); + + /** + * Tests that in case of port collision, a FlinkRuntimeException is thrown + * with a specific message. + */ + @Test + public void testServerInitializationFailure() throws Throwable { + + // the expected exception along with the adequate message + expectedEx.expect(FlinkRuntimeException.class); + expectedEx.expectMessage("Unable to start server Test Server 2. All ports in provided range are occupied."); + + TestServer server1 = null; + TestServer server2 = null; + try { + + server1 = startServer("Test Server 1", 7777); + Assert.assertEquals(7777L, server1.getServerAddress().getPort()); + + server2 = startServer("Test Server 2", 7777); + } finally { + + if (server1 != null) { + server1.shutdown(); + } + + if (server2 != null) { + server2.shutdown(); + } + } + } + + /** + * Tests that in case of port collision and big enough port range, + * the server will try to bind to the next port in the range. + */ + @Test + public void testPortRangeSuccess() throws Throwable { + TestServer server1 = null; + TestServer server2 = null; + Client<TestMessage, TestMessage> client = null; + + try { + server1 = startServer("Test Server 1", 7777, 7778, 7779); + Assert.assertEquals(7777L, server1.getServerAddress().getPort()); + + server2 = startServer("Test Server 2", 7777, 7778, 7779); + Assert.assertEquals(7778L, server2.getServerAddress().getPort()); + + client = new Client<>( + "Test Client", + 1, + new MessageSerializer<>(new TestMessage.TestMessageDeserializer(), new TestMessage.TestMessageDeserializer()), + new DisabledKvStateRequestStats()); + + TestMessage response1 = client.sendRequest(server1.getServerAddress(), new TestMessage("ping")).join(); + Assert.assertEquals(server1.getServerName() + "-ping", response1.getMessage()); + + TestMessage response2 = client.sendRequest(server2.getServerAddress(), new TestMessage("pong")).join(); + Assert.assertEquals(server2.getServerName() + "-pong", response2.getMessage()); + } finally { + + if (server1 != null) { + server1.shutdown(); + } + + if (server2 != null) { + server2.shutdown(); + } + + if (client != null) { + client.shutdown(); + } + } + } + + /** + * Initializes a {@link TestServer} with the given port range. + * @param serverName the name of the server. + * @param ports a range of ports. + * @return A test server with the given name. + */ + private TestServer startServer(String serverName, int... ports) throws Throwable { + List<Integer> portList = new ArrayList<>(ports.length); + for (int p : ports) { + portList.add(p); + } + + final TestServer server = new TestServer(serverName, portList.iterator()); + server.start(); + return server; + } + + /** + * A server that receives a {@link TestMessage test message} and returns another test + * message containing the same string as the request with the name of the server prepended. + */ + private class TestServer extends AbstractServerBase<TestMessage, TestMessage> { + + protected TestServer(String name, Iterator<Integer> bindPort) throws UnknownHostException { + super(name, InetAddress.getLocalHost(), bindPort, 1, 1); + } + + @Override + public AbstractServerHandler<TestMessage, TestMessage> initializeHandler() { + return new AbstractServerHandler<TestMessage, TestMessage>( + this, + new MessageSerializer<>(new TestMessage.TestMessageDeserializer(), new TestMessage.TestMessageDeserializer()), + new DisabledKvStateRequestStats()) { + + @Override + public CompletableFuture<TestMessage> handleRequest(long requestId, TestMessage request) { + TestMessage response = new TestMessage(getServerName() + '-' + request.getMessage()); + return CompletableFuture.completedFuture(response); + } + + @Override + public void shutdown() { + // do nothing + } + }; + } + } + + /** + * Message with a string as payload. + */ + private static class TestMessage extends MessageBody { + + private final String message; + + TestMessage(String message) { + this.message = Preconditions.checkNotNull(message); + } + + public String getMessage() { + return message; + } + + @Override + public byte[] serialize() { + byte[] content = message.getBytes(ConfigConstants.DEFAULT_CHARSET); + + // message size + 4 for the length itself + return ByteBuffer.allocate(content.length + Integer.BYTES) + .putInt(content.length) + .put(content) + .array(); + } + + /** + * The deserializer for our {@link TestMessage test messages}. + */ + public static class TestMessageDeserializer implements MessageDeserializer<TestMessage> { + + @Override + public TestMessage deserializeMessage(ByteBuf buf) { + int length = buf.readInt(); + String message = ""; + if (length > 0) { + byte[] name = new byte[length]; + buf.readBytes(name); + message = new String(name, ConfigConstants.DEFAULT_CHARSET); + } + return new TestMessage(message); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/5338f856/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java index b6f855e..53b1592 100644 --- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java +++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java @@ -566,7 +566,7 @@ public class ClientTest { * that all ongoing requests are failed. */ @Test - public void testClientServerIntegration() throws Exception { + public void testClientServerIntegration() throws Throwable { // Config final int numServers = 2; final int numServerEventLoopThreads = 2; http://git-wip-us.apache.org/repos/asf/flink/blob/5338f856/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java index 944349ee..97e999d 100644 --- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java +++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java @@ -63,7 +63,6 @@ import org.junit.BeforeClass; import org.junit.Test; import java.net.InetAddress; -import java.net.UnknownHostException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -93,7 +92,8 @@ public class KvStateServerHandlerTest extends TestLogger { 1, new KvStateRegistry(), new DisabledKvStateRequestStats()); - } catch (UnknownHostException e) { + testServer.start(); + } catch (Throwable e) { e.printStackTrace(); } } @@ -376,7 +376,7 @@ public class KvStateServerHandlerTest extends TestLogger { * Tests the failure response on a rejected execution, because the query executor has been closed. */ @Test - public void testQueryExecutorShutDown() throws Exception { + public void testQueryExecutorShutDown() throws Throwable { KvStateRegistry registry = new KvStateRegistry(); AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); @@ -388,6 +388,7 @@ public class KvStateServerHandlerTest extends TestLogger { new KvStateRegistry(), new DisabledKvStateRequestStats()); + localTestServer.start(); localTestServer.shutdown(); assertTrue(localTestServer.isExecutorShutdown()); http://git-wip-us.apache.org/repos/asf/flink/blob/5338f856/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java index b7f489a..16f80c6 100644 --- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java +++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java @@ -86,7 +86,7 @@ public class KvStateServerTest { * Tests a simple successful query via a SocketChannel. */ @Test - public void testSimpleRequest() throws Exception { + public void testSimpleRequest() throws Throwable { KvStateServerImpl server = null; Bootstrap bootstrap = null; try { http://git-wip-us.apache.org/repos/asf/flink/blob/5338f856/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java index d6c5d75..cb43fbf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java @@ -65,10 +65,10 @@ public class NetworkEnvironment { private final TaskEventDispatcher taskEventDispatcher; /** Server for {@link InternalKvState} requests. */ - private final KvStateServer kvStateServer; + private KvStateServer kvStateServer; /** Proxy for the queryable state client. */ - private final KvStateClientProxy kvStateProxy; + private KvStateClientProxy kvStateProxy; /** Registry for {@link InternalKvState} instances. */ private final KvStateRegistry kvStateRegistry; @@ -311,7 +311,9 @@ public class NetworkEnvironment { try { kvStateServer.start(); LOG.info("Started Queryable State Data Server @ {}", kvStateServer.getServerAddress()); - } catch (InterruptedException ie) { + } catch (Throwable ie) { + kvStateServer.shutdown(); + kvStateServer = null; throw new IOException("Failed to start the Queryable State Data Server.", ie); } } @@ -320,7 +322,9 @@ public class NetworkEnvironment { try { kvStateProxy.start(); LOG.info("Started the Queryable State Client Proxy @ {}", kvStateProxy.getServerAddress()); - } catch (InterruptedException ie) { + } catch (Throwable ie) { + kvStateProxy.shutdown(); + kvStateProxy = null; throw new IOException("Failed to start the Queryable State Client Proxy.", ie); } } http://git-wip-us.apache.org/repos/asf/flink/blob/5338f856/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java index 81727fc..17ffe0d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java @@ -33,7 +33,7 @@ public interface KvStateServer { /** Starts the server. */ - void start() throws InterruptedException; + void start() throws Throwable; /** Shuts down the server and all related thread pools. */ void shutdown(); http://git-wip-us.apache.org/repos/asf/flink/blob/5338f856/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java index 8f66734..f6a8627 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java @@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.net.InetAddress; +import java.util.Iterator; /** * Utility class to initialize entities used in queryable state. @@ -40,7 +41,9 @@ public final class QueryableStateUtils { * receiving requests from the external (to the cluster) client and forwarding them internally. * * @param address the address to bind to. - * @param port the port to listen to. + * @param ports the range of ports the proxy will attempt to listen to + * (see {@link org.apache.flink.configuration.QueryableStateOptions#PROXY_PORT_RANGE + * QueryableStateOptions.PROXY_PORT_RANGE}). * @param eventLoopThreads the number of threads to be used to process incoming requests. * @param queryThreads the number of threads to be used to send the actual state. * @param stats statistics to be gathered about the incoming requests. @@ -48,7 +51,7 @@ public final class QueryableStateUtils { */ public static KvStateClientProxy createKvStateClientProxy( final InetAddress address, - final int port, + final Iterator<Integer> ports, final int eventLoopThreads, final int queryThreads, final KvStateRequestStats stats) { @@ -64,11 +67,11 @@ public final class QueryableStateUtils { Class<? extends KvStateClientProxy> clazz = Class.forName(classname).asSubclass(KvStateClientProxy.class); Constructor<? extends KvStateClientProxy> constructor = clazz.getConstructor( InetAddress.class, - Integer.class, + Iterator.class, Integer.class, Integer.class, KvStateRequestStats.class); - return constructor.newInstance(address, port, eventLoopThreads, queryThreads, stats); + return constructor.newInstance(address, ports, eventLoopThreads, queryThreads, stats); } catch (ClassNotFoundException e) { LOG.warn("Could not load Queryable State Client Proxy. " + "Probable reason: flink-queryable-state is not in the classpath"); http://git-wip-us.apache.org/repos/asf/flink/blob/5338f856/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java index fed5fc0..0c3ef0e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.taskexecutor; +import java.util.Iterator; + import static org.apache.flink.util.Preconditions.checkArgument; /** @@ -27,19 +29,19 @@ public class QueryableStateConfiguration { private final boolean enabled; - private final int port; + private final Iterator<Integer> proxyPortRange; private final int numServerThreads; private final int numQueryThreads; - public QueryableStateConfiguration(boolean enabled, int port, int numServerThreads, int numQueryThreads) { - checkArgument(port >= 0 && port < 65536, "queryable state server port out of range"); + public QueryableStateConfiguration(boolean enabled, Iterator<Integer> proxyPortRange, int numServerThreads, int numQueryThreads) { + checkArgument(!enabled || (proxyPortRange != null && proxyPortRange.hasNext())); checkArgument(numServerThreads >= 0, "queryable state number of server threads must be zero or larger"); checkArgument(numQueryThreads >= 0, "queryable state number of query threads must be zero or larger"); this.enabled = enabled; - this.port = port; + this.proxyPortRange = proxyPortRange; this.numServerThreads = numServerThreads; this.numQueryThreads = numQueryThreads; } @@ -49,15 +51,16 @@ public class QueryableStateConfiguration { /** * Returns whether queryable state is enabled. */ - public boolean enabled() { + public boolean isEnabled() { return enabled; } /** - * Returns the port where the server should listen. + * Returns the port range where the queryable state client proxy can listen. + * See {@link org.apache.flink.configuration.QueryableStateOptions#PROXY_PORT_RANGE QueryableStateOptions.PROXY_PORT_RANGE}. */ - public int port() { - return port; + public Iterator<Integer> ports() { + return proxyPortRange; } /** @@ -82,7 +85,6 @@ public class QueryableStateConfiguration { public String toString() { return "QueryableStateConfiguration {" + "enabled=" + enabled + - ", port=" + port + ", numServerThreads=" + numServerThreads + ", numQueryThreads=" + numQueryThreads + '}'; @@ -94,6 +96,6 @@ public class QueryableStateConfiguration { * Gets the configuration describing the queryable state as deactivated. */ public static QueryableStateConfiguration disabled() { - return new QueryableStateConfiguration(false, 0, 0, 0); + return new QueryableStateConfiguration(false, null, 0, 0); } } http://git-wip-us.apache.org/repos/asf/flink/blob/5338f856/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java index cbf0d95..312622b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java @@ -360,7 +360,7 @@ public class TaskManagerServices { KvStateClientProxy kvClientProxy = null; KvStateServer kvStateServer = null; - if (taskManagerServicesConfiguration.getQueryableStateConfig().enabled()) { + if (taskManagerServicesConfiguration.getQueryableStateConfig().isEnabled()) { QueryableStateConfiguration qsConfig = taskManagerServicesConfiguration.getQueryableStateConfig(); int numNetworkThreads = qsConfig.numServerThreads() == 0 ? @@ -371,7 +371,7 @@ public class TaskManagerServices { kvClientProxy = QueryableStateUtils.createKvStateClientProxy( taskManagerServicesConfiguration.getTaskManagerAddress(), - qsConfig.port(), + qsConfig.ports(), numNetworkThreads, numQueryThreads, new DisabledKvStateRequestStats()); http://git-wip-us.apache.org/repos/asf/flink/blob/5338f856/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java index e6643b7..31bfeff 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; import org.apache.flink.util.MathUtils; +import org.apache.flink.util.NetUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +39,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.util.Iterator; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -189,7 +191,6 @@ public class TaskManagerServicesConfiguration { remoteAddress, slots); - // @Ufuk todo why was it like this before ??? final QueryableStateConfiguration queryableStateConfig = parseQueryableStateConfiguration(configuration); @@ -415,10 +416,11 @@ public class TaskManagerServicesConfiguration { final boolean enabled = config.getBoolean(QueryableStateOptions.SERVER_ENABLE); if (enabled) { - int port = config.getInteger(QueryableStateOptions.SERVER_PORT); - int numNetworkThreads = config.getInteger(QueryableStateOptions.SERVER_NETWORK_THREADS); - int numQueryThreads = config.getInteger(QueryableStateOptions.SERVER_ASYNC_QUERY_THREADS); - return new QueryableStateConfiguration(true, port, numNetworkThreads, numQueryThreads); + final Iterator<Integer> ports = NetUtils.getPortRangeFromString( + config.getString(QueryableStateOptions.PROXY_PORT_RANGE, "9069")); + final int numNetworkThreads = config.getInteger(QueryableStateOptions.SERVER_NETWORK_THREADS); + final int numQueryThreads = config.getInteger(QueryableStateOptions.SERVER_ASYNC_QUERY_THREADS); + return new QueryableStateConfiguration(true, ports, numNetworkThreads, numQueryThreads); } else { return QueryableStateConfiguration.disabled();