Livy:337 Binding RPCServer to user provided port and not random port (#334)
* Code changes in RPCserver for user provided port * Indentation Changes * Indentation Changes * Indentation Changes * Indentation Changes * Configuring Port Range * Documentation Changed * launcher.port.range will take care of launching RPC * Checkstyle changes * Checkstyle changes * Dummy push * Code changes * Changed BindException Handling to SocketException Handling * Changed Import Order * Code changes to increase port range * Set Port isConntect to true * Indentation Changes & port range in livy-client.conf.template * Indentation changes * Changed visibilty of method private * Indentation Changes * Indenetation Changes * Unit test case to test port range * Checkstyle changes * Unit test case for port range * Added comment for Port Range Configuration and increase port range for unit test case Project: http://git-wip-us.apache.org/repos/asf/incubator-livy/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-livy/commit/9ae24d08 Tree: http://git-wip-us.apache.org/repos/asf/incubator-livy/tree/9ae24d08 Diff: http://git-wip-us.apache.org/repos/asf/incubator-livy/diff/9ae24d08 Branch: refs/heads/master Commit: 9ae24d08738652ba5fd817780711d01b110d74a9 Parents: 02eef9a Author: pralabhkumar <pralabhku...@gmail.com> Authored: Thu Jun 8 13:22:25 2017 +0530 Committer: Jeff Zhang <zjf...@gmail.com> Committed: Thu Jun 8 15:52:25 2017 +0800 ---------------------------------------------------------------------- conf/livy-client.conf.template | 5 +- .../java/com/cloudera/livy/rsc/RSCConf.java | 4 +- .../com/cloudera/livy/rsc/rpc/RpcServer.java | 86 ++++++++++++++++---- .../java/com/cloudera/livy/rsc/rpc/TestRpc.java | 27 ++++++ 4 files changed, 104 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/9ae24d08/conf/livy-client.conf.template ---------------------------------------------------------------------- diff --git a/conf/livy-client.conf.template b/conf/livy-client.conf.template index 2a92b57..06ad653 100644 --- a/conf/livy-client.conf.template +++ b/conf/livy-client.conf.template @@ -55,7 +55,8 @@ # Address for the RSC driver to connect back with it's connection info. # livy.rsc.launcher.address = -# livy.rsc.launcher.port = -1 +# Port Range on which RPC will launch . Port range in inclusive of start and end port . +# livy.rsc.launcher.port.range = 10000~10110 # How long will the RSC wait for a connection for a Livy server before shutting itself down. # livy.rsc.server.idle-timeout = 10m @@ -83,4 +84,4 @@ # livy.rsc.job-cancel.timeout = 30s # Number of statements kept in driver's memory -# livy.rsc.retained-statements = 100 \ No newline at end of file +# livy.rsc.retained-statements = 100 http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/9ae24d08/rsc/src/main/java/com/cloudera/livy/rsc/RSCConf.java ---------------------------------------------------------------------- diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/RSCConf.java b/rsc/src/main/java/com/cloudera/livy/rsc/RSCConf.java index d1b8b39..afd935d 100644 --- a/rsc/src/main/java/com/cloudera/livy/rsc/RSCConf.java +++ b/rsc/src/main/java/com/cloudera/livy/rsc/RSCConf.java @@ -51,8 +51,10 @@ public class RSCConf extends ClientConf<RSCConf> { // Address for the RSC driver to connect back with it's connection info. LAUNCHER_ADDRESS("launcher.address", null), + LAUNCHER_PORT_RANGE("launcher.port.range", "10000~10010"), + // Setting up of this propety by user has no benefit. It is currently being used + // to pass port information from ContextLauncher to RSCDriver LAUNCHER_PORT("launcher.port", -1), - // How long will the RSC wait for a connection for a Livy server before shutting itself down. SERVER_IDLE_TIMEOUT("server.idle-timeout", "10m"), http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/9ae24d08/rsc/src/main/java/com/cloudera/livy/rsc/rpc/RpcServer.java ---------------------------------------------------------------------- diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/rpc/RpcServer.java b/rsc/src/main/java/com/cloudera/livy/rsc/rpc/RpcServer.java index 1d3e6c5..44db976 100644 --- a/rsc/src/main/java/com/cloudera/livy/rsc/rpc/RpcServer.java +++ b/rsc/src/main/java/com/cloudera/livy/rsc/rpc/RpcServer.java @@ -19,7 +19,10 @@ package com.cloudera.livy.rsc.rpc; import java.io.Closeable; import java.io.IOException; +import java.net.BindException; import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.SocketException; import java.security.SecureRandom; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -61,18 +64,78 @@ public class RpcServer implements Closeable { private static final SecureRandom RND = new SecureRandom(); private final String address; - private final Channel channel; + private Channel channel; private final EventLoopGroup group; private final int port; private final ConcurrentMap<String, ClientInfo> pendingClients; private final RSCConf config; - + private final String portRange; + private static enum PortRangeSchema{START_PORT, END_PORT, MAX}; + private final String PORT_DELIMITER = "~"; + /** + * Creating RPC Server + * @param lconf + * @throws IOException + * @throws InterruptedException + */ public RpcServer(RSCConf lconf) throws IOException, InterruptedException { this.config = lconf; + this.portRange = config.get(LAUNCHER_PORT_RANGE); this.group = new NioEventLoopGroup( - this.config.getInt(RPC_MAX_THREADS), - Utils.newDaemonThreadFactory("RPC-Handler-%d")); - this.channel = new ServerBootstrap() + this.config.getInt(RPC_MAX_THREADS), + Utils.newDaemonThreadFactory("RPC-Handler-%d")); + int [] portData = getPortNumberAndRange(); + int startingPortNumber = portData[PortRangeSchema.START_PORT.ordinal()]; + int endPort = portData[PortRangeSchema.END_PORT.ordinal()]; + boolean isContected = false; + for(int tries = startingPortNumber ; tries<=endPort ; tries++){ + try { + this.channel = getChannel(tries); + isContected = true; + break; + } catch(SocketException e){ + LOG.debug("RPC not able to connect port " + tries + " " + e.getMessage()); + } + } + if(!isContected) { + throw new IOException("Unable to connect to provided ports " + this.portRange); + } + this.port = ((InetSocketAddress) channel.localAddress()).getPort(); + this.pendingClients = new ConcurrentHashMap<>(); + LOG.info("Connected to the port " + this.port); + String address = config.get(RPC_SERVER_ADDRESS); + if (address == null) { + address = config.findLocalAddress(); + } + this.address = address; + } + + /** + * Get Port Numbers + */ + private int[] getPortNumberAndRange() throws ArrayIndexOutOfBoundsException, + NumberFormatException { + String[] split = this.portRange.split(PORT_DELIMITER); + int [] portRange = new int [PortRangeSchema.MAX.ordinal()]; + try { + portRange[PortRangeSchema.START_PORT.ordinal()] = + Integer.parseInt(split[PortRangeSchema.START_PORT.ordinal()]); + portRange[PortRangeSchema.END_PORT.ordinal()] = + Integer.parseInt(split[PortRangeSchema.END_PORT.ordinal()]); + } catch(ArrayIndexOutOfBoundsException e) { + LOG.error("Port Range format is not correct " + this.portRange); + throw e; + } catch(NumberFormatException e) { + LOG.error("Port are not in numeric format " + this.portRange); + throw e; + } + return portRange; + } + /** + * @throws InterruptedException + **/ + private Channel getChannel(int portNumber) throws BindException, InterruptedException { + Channel channel = new ServerBootstrap() .group(group) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @@ -97,19 +160,11 @@ public class RpcServer implements Closeable { .option(ChannelOption.SO_BACKLOG, 1) .option(ChannelOption.SO_REUSEADDR, true) .childOption(ChannelOption.SO_KEEPALIVE, true) - .bind(0) + .bind(portNumber) .sync() .channel(); - this.port = ((InetSocketAddress) channel.localAddress()).getPort(); - this.pendingClients = new ConcurrentHashMap<>(); - - String address = config.get(RPC_SERVER_ADDRESS); - if (address == null) { - address = config.findLocalAddress(); - } - this.address = address; + return channel; } - /** * Tells the RPC server to expect connections from clients. * @@ -310,3 +365,4 @@ public class RpcServer implements Closeable { } } + http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/9ae24d08/rsc/src/test/java/com/cloudera/livy/rsc/rpc/TestRpc.java ---------------------------------------------------------------------- diff --git a/rsc/src/test/java/com/cloudera/livy/rsc/rpc/TestRpc.java b/rsc/src/test/java/com/cloudera/livy/rsc/rpc/TestRpc.java index cf19fee..48abe94 100644 --- a/rsc/src/test/java/com/cloudera/livy/rsc/rpc/TestRpc.java +++ b/rsc/src/test/java/com/cloudera/livy/rsc/rpc/TestRpc.java @@ -18,6 +18,8 @@ package com.cloudera.livy.rsc.rpc; import java.io.Closeable; +import java.io.IOException; +import java.net.SocketException; import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.CountDownLatch; @@ -186,6 +188,31 @@ public class TestRpc { assertEquals(outbound.message, reply.message); } + @Test + public void testPortRange() throws Exception { + String portRange = "a~b"; + emptyConfig.set(LAUNCHER_PORT_RANGE, portRange); + try { + autoClose(new RpcServer(emptyConfig)); + } catch (Exception ee) { + assertTrue(ee instanceof NumberFormatException); + } + portRange = "11000"; + emptyConfig.set(LAUNCHER_PORT_RANGE, portRange); + try { + autoClose(new RpcServer(emptyConfig)); + } catch (Exception ee) { + assertTrue(ee instanceof ArrayIndexOutOfBoundsException); + } + portRange = "11000~11110"; + emptyConfig.set(LAUNCHER_PORT_RANGE, portRange); + String [] portRangeData = portRange.split("~"); + int startPort = Integer.parseInt(portRangeData[0]); + int endPort = Integer.parseInt(portRangeData[1]); + RpcServer server = autoClose(new RpcServer(emptyConfig)); + assertTrue(startPort <= server.getPort() && server.getPort() <= endPort); + } + private void transfer(Rpc serverRpc, Rpc clientRpc) { EmbeddedChannel client = (EmbeddedChannel) clientRpc.getChannel(); EmbeddedChannel server = (EmbeddedChannel) serverRpc.getChannel();