Repository: zeppelin Updated Branches: refs/heads/master 3b1a03f38 -> 382479fd5
ZEPPELIN-3040. Allow to specify portRange for interpreter process thrift service ### What is this PR for? This PR is trying to add new configuration `zeppelin.interpreter.portRange` which control the portRange of interpreter process. This is required by some users for security reason. ### What type of PR is it? [Improvement | Feature ] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-3040 ### How should this be tested? Manually test. Set zeppelin.interpreter.portRange and launch python interpreter, verify it is in the proper portRange. ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #2661 from zjffdu/ZEPPELIN-3040 and squashes the following commits: a87c425 [Jeff Zhang] address comments 7e885bd [Jeff Zhang] ZEPPELIN-3040. Allow to specify portRange for interpreter process thrift service Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/382479fd Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/382479fd Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/382479fd Branch: refs/heads/master Commit: 382479fd502b5872fe0f7914d9901c4473069cc2 Parents: 3b1a03f Author: Jeff Zhang <zjf...@apache.org> Authored: Sun Nov 12 09:18:41 2017 +0800 Committer: Jeff Zhang <zjf...@apache.org> Committed: Mon Nov 13 15:27:17 2017 +0800 ---------------------------------------------------------------------- bin/interpreter.sh | 15 +++++---- .../zeppelin/helium/ZeppelinDevServer.java | 2 +- .../zeppelin/conf/ZeppelinConfiguration.java | 5 +++ .../remote/RemoteInterpreterServer.java | 28 ++++++++++------- .../remote/RemoteInterpreterUtils.java | 32 +++++++++++++------- .../remote/RemoteInterpreterServerTest.java | 4 +-- .../remote/RemoteInterpreterUtilsTest.java | 10 +++--- .../launcher/ShellScriptLauncher.java | 2 +- .../remote/RemoteInterpreterManagedProcess.java | 27 +++++++++-------- 9 files changed, 76 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/382479fd/bin/interpreter.sh ---------------------------------------------------------------------- diff --git a/bin/interpreter.sh b/bin/interpreter.sh index 4e983ec..458ffc0 100755 --- a/bin/interpreter.sh +++ b/bin/interpreter.sh @@ -20,10 +20,10 @@ bin=$(dirname "${BASH_SOURCE-$0}") bin=$(cd "${bin}">/dev/null; pwd) function usage() { - echo "usage) $0 -p <port> -d <interpreter dir to load> -l <local interpreter repo dir to load> -g <interpreter group name>" + echo "usage) $0 -p <port> -r <intp_port> -d <interpreter dir to load> -l <local interpreter repo dir to load> -g <interpreter group name>" } -while getopts "hc:p:d:l:v:u:g:" o; do +while getopts "hc:p:r:d:l:v:u:g:" o; do case ${o} in h) usage @@ -36,7 +36,10 @@ while getopts "hc:p:d:l:v:u:g:" o; do CALLBACK_HOST=${OPTARG} # This will be used callback host ;; p) - PORT=${OPTARG} # This will be used callback port + PORT=${OPTARG} # This will be used for callback port + ;; + r) + INTP_PORT=${OPTARG} # This will be used for interpreter process port ;; l) LOCAL_INTERPRETER_REPO=${OPTARG} @@ -204,12 +207,12 @@ fi if [[ -n "${SPARK_SUBMIT}" ]]; then if [[ -n "$ZEPPELIN_IMPERSONATE_USER" ]] && [[ "$ZEPPELIN_IMPERSONATE_SPARK_PROXY_USER" != "false" ]]; then - INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} ${ZEPPELIN_SPARK_CONF} --proxy-user ${ZEPPELIN_IMPERSONATE_USER} ${SPARK_APP_JAR} ${CALLBACK_HOST} ${PORT}` + INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} ${ZEPPELIN_SPARK_CONF} --proxy-user ${ZEPPELIN_IMPERSONATE_USER} ${SPARK_APP_JAR} ${CALLBACK_HOST} ${PORT} ${INTP_PORT}` else - INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} ${ZEPPELIN_SPARK_CONF} ${SPARK_APP_JAR} ${CALLBACK_HOST} ${PORT}` + INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} ${ZEPPELIN_SPARK_CONF} ${SPARK_APP_JAR} ${CALLBACK_HOST} ${PORT} ${INTP_PORT}` fi else - INTERPRETER_RUN_COMMAND+=' '` echo ${ZEPPELIN_RUNNER} ${JAVA_INTP_OPTS} ${ZEPPELIN_INTP_MEM} -cp ${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH} ${ZEPPELIN_SERVER} ${CALLBACK_HOST} ${PORT} ` + INTERPRETER_RUN_COMMAND+=' '` echo ${ZEPPELIN_RUNNER} ${JAVA_INTP_OPTS} ${ZEPPELIN_INTP_MEM} -cp ${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH} ${ZEPPELIN_SERVER} ${CALLBACK_HOST} ${PORT} ${INTP_PORT}` fi if [[ ! -z "$ZEPPELIN_IMPERSONATE_USER" ]] && [[ -n "${suid}" || -z "${SPARK_SUBMIT}" ]]; then http://git-wip-us.apache.org/repos/asf/zeppelin/blob/382479fd/helium-dev/src/main/java/org/apache/zeppelin/helium/ZeppelinDevServer.java ---------------------------------------------------------------------- diff --git a/helium-dev/src/main/java/org/apache/zeppelin/helium/ZeppelinDevServer.java b/helium-dev/src/main/java/org/apache/zeppelin/helium/ZeppelinDevServer.java index 3a5199d..607839e 100644 --- a/helium-dev/src/main/java/org/apache/zeppelin/helium/ZeppelinDevServer.java +++ b/helium-dev/src/main/java/org/apache/zeppelin/helium/ZeppelinDevServer.java @@ -39,7 +39,7 @@ public class ZeppelinDevServer extends private DevInterpreter interpreter = null; private InterpreterOutput out; public ZeppelinDevServer(int port) throws TException, IOException { - super(null, port); + super(null, port, ":"); } @Override http://git-wip-us.apache.org/repos/asf/zeppelin/blob/382479fd/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index f280475..1bc242d 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -488,6 +488,10 @@ public class ZeppelinConfiguration extends XMLConfiguration { return getString(ConfVars.ZEPPELIN_INTERPRETER_CALLBACK_PORTRANGE); } + public String getInterpreterPortRange() { + return getString(ConfVars.ZEPPELIN_INTERPRETER_PORTRANGE); + } + public boolean isWindowsPath(String path){ return path.matches("^[A-Za-z]:\\\\.*"); } @@ -705,6 +709,7 @@ public class ZeppelinConfiguration extends XMLConfiguration { ZEPPELIN_SERVER_KERBEROS_PRINCIPAL("zeppelin.server.kerberos.principal", ""), ZEPPELIN_INTERPRETER_CALLBACK_PORTRANGE("zeppelin.interpreter.callback.portRange", ":"), + ZEPPELIN_INTERPRETER_PORTRANGE("zeppelin.interpreter.portRange", ":"), ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_CLASS("zeppelin.interpreter.lifecyclemanager.class", "org.apache.zeppelin.interpreter.lifecycle.TimeoutLifecycleManager"), http://git-wip-us.apache.org/repos/asf/zeppelin/blob/382479fd/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index cb0488c..86f35c6 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -132,19 +132,19 @@ public class RemoteInterpreterServer private boolean isTest; - public RemoteInterpreterServer(String callbackHost, int port) throws IOException, - TTransportException { - this(callbackHost, port, false); + public RemoteInterpreterServer(String callbackHost, int callbackPort, String portRange) + throws IOException, TTransportException { + this(callbackHost, callbackPort, portRange, false); } - public RemoteInterpreterServer(String callbackHost, int port, boolean isTest) - throws TTransportException, IOException { + public RemoteInterpreterServer(String callbackHost, int callbackPort, String portRange, + boolean isTest) throws TTransportException, IOException { if (null != callbackHost) { this.callbackHost = callbackHost; - this.callbackPort = port; + this.callbackPort = callbackPort; } else { // DevInterpreter - this.port = port; + this.port = callbackPort; } this.isTest = isTest; @@ -152,14 +152,16 @@ public class RemoteInterpreterServer TServerSocket serverTransport; if (null == callbackHost) { // Dev Interpreter - serverTransport = new TServerSocket(port); + serverTransport = new TServerSocket(callbackPort); } else { - this.port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(); + serverTransport = RemoteInterpreterUtils.createTServerSocket(portRange); + this.port = serverTransport.getServerSocket().getLocalPort(); this.host = RemoteInterpreterUtils.findAvailableHostAddress(); - serverTransport = new TServerSocket(this.port); + logger.info("Launching ThriftServer at " + this.host + ":" + this.port); } server = new TThreadPoolServer( new TThreadPoolServer.Args(serverTransport).processor(processor)); + logger.info("Starting remote interpreter server on port {}", port); remoteWorksResponsePool = Collections.synchronizedMap(new HashMap<String, Object>()); remoteWorksController = new ZeppelinRemoteWorksController(this, remoteWorksResponsePool); } @@ -254,12 +256,16 @@ public class RemoteInterpreterServer throws TTransportException, InterruptedException, IOException { String callbackHost = null; int port = Constants.ZEPPELIN_INTERPRETER_DEFAUlT_PORT; + String portRange = ":"; if (args.length > 0) { callbackHost = args[0]; port = Integer.parseInt(args[1]); + if (args.length > 2) { + portRange = args[2]; + } } RemoteInterpreterServer remoteInterpreterServer = - new RemoteInterpreterServer(callbackHost, port); + new RemoteInterpreterServer(callbackHost, port, portRange); remoteInterpreterServer.start(); remoteInterpreterServer.join(); System.exit(0); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/382479fd/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java index 835199a..223588f 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java @@ -29,11 +29,15 @@ import java.net.Socket; import java.net.SocketException; import java.net.UnknownHostException; import java.util.Collections; + +import org.apache.commons.lang.StringUtils; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TServerSocket; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,7 +52,12 @@ public class RemoteInterpreterUtils { public static int findRandomAvailablePortOnAllLocalInterfaces() throws IOException { - return findRandomAvailablePortOnAllLocalInterfaces(":"); + int port; + try (ServerSocket socket = new ServerSocket(0);) { + port = socket.getLocalPort(); + socket.close(); + } + return port; } /** @@ -58,21 +67,22 @@ public class RemoteInterpreterUtils { * @return * @throws IOException */ - public static int findRandomAvailablePortOnAllLocalInterfaces(String portRange) + public static TServerSocket createTServerSocket(String portRange) throws IOException { + TServerSocket tSocket = null; // ':' is the default value which means no constraints on the portRange - if (portRange == null || portRange.equals(":")) { - int port; - try (ServerSocket socket = new ServerSocket(0);) { - port = socket.getLocalPort(); - socket.close(); + if (StringUtils.isBlank(portRange) || portRange.equals(":")) { + try { + tSocket = new TServerSocket(0); + return tSocket; + } catch (TTransportException e) { + throw new IOException("Fail to create TServerSocket", e); } - return port; } // valid user registered port https://en.wikipedia.org/wiki/Registered_port int start = 1024; - int end = 49151; + int end = 65535; String[] ports = portRange.split(":", -1); if (!ports[0].isEmpty()) { start = Integer.parseInt(ports[0]); @@ -82,8 +92,8 @@ public class RemoteInterpreterUtils { } for (int i = start; i <= end; ++i) { try { - ServerSocket socket = new ServerSocket(i); - return socket.getLocalPort(); + tSocket = new TServerSocket(i); + return tSocket; } catch (Exception e) { // ignore this } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/382479fd/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java index b2fcae1..1cb2cb6 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java @@ -43,7 +43,7 @@ public class RemoteInterpreterServerTest { @Test public void testStartStop() throws InterruptedException, IOException, TException { RemoteInterpreterServer server = new RemoteInterpreterServer("localhost", - RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), true); + RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), ":", true); assertEquals(false, server.isRunning()); server.start(); @@ -91,7 +91,7 @@ public class RemoteInterpreterServerTest { @Test public void testStartStopWithQueuedEvents() throws InterruptedException, IOException, TException { RemoteInterpreterServer server = new RemoteInterpreterServer("localhost", - RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), true); + RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), ":", true); assertEquals(false, server.isRunning()); server.start(); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/382479fd/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java index afbbcbd..8eeb85a 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java @@ -26,17 +26,17 @@ import static org.junit.Assert.assertTrue; public class RemoteInterpreterUtilsTest { @Test - public void testFindRandomAvailablePortOnAllLocalInterfaces() throws IOException { - assertTrue(RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces() > 0); + public void testCreateTServerSocket() throws IOException { + assertTrue(RemoteInterpreterUtils.createTServerSocket(":").getServerSocket().getLocalPort() > 0); String portRange = ":30000"; - assertTrue(RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(portRange) <= 30000); + assertTrue(RemoteInterpreterUtils.createTServerSocket(portRange).getServerSocket().getLocalPort() <= 30000); portRange = "30000:"; - assertTrue(RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(portRange) >= 30000); + assertTrue(RemoteInterpreterUtils.createTServerSocket(portRange).getServerSocket().getLocalPort() >= 30000); portRange = "30000:40000"; - int port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(portRange); + int port = RemoteInterpreterUtils.createTServerSocket(portRange).getServerSocket().getLocalPort(); assertTrue(port >= 30000 && port <= 40000); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/382479fd/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java index 0966ec5..8c86129 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java @@ -64,7 +64,7 @@ public class ShellScriptLauncher extends InterpreterLauncher { + context.getInterpreterSettingId(); return new RemoteInterpreterManagedProcess( runner != null ? runner.getPath() : zConf.getInterpreterRemoteRunnerPath(), - zConf.getCallbackPortRange(), + zConf.getCallbackPortRange(), zConf.getInterpreterPortRange(), zConf.getInterpreterDir() + "/" + groupName, localRepoPath, buildEnvFromProperties(), connectTimeout, name); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/382479fd/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java index 9f8f346..27e826c 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java @@ -52,11 +52,12 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess RemoteInterpreterManagedProcess.class); private final String interpreterRunner; - private final String portRange; + private final String callbackPortRange; + private final String interpreterPortRange; private DefaultExecutor executor; private ExecuteWatchdog watchdog; private AtomicBoolean running = new AtomicBoolean(false); - TServer callbackServer; + private TServer callbackServer; private String host = null; private int port = -1; private final String interpreterDir; @@ -67,7 +68,8 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess public RemoteInterpreterManagedProcess( String intpRunner, - String portRange, + String callbackPortRange, + String interpreterPortRange, String intpDir, String localRepoDir, Map<String, String> env, @@ -75,7 +77,8 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess String interpreterSettingName) { super(connectTimeout); this.interpreterRunner = intpRunner; - this.portRange = portRange; + this.callbackPortRange = callbackPortRange; + this.interpreterPortRange = interpreterPortRange; this.env = env; this.interpreterDir = intpDir; this.localRepoDir = localRepoDir; @@ -84,7 +87,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess @Override public String getHost() { - return "localhost"; + return host; } @Override @@ -97,11 +100,11 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess // start server process final String callbackHost; final int callbackPort; + TServerSocket tSocket = null; try { - port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(portRange); - logger.info("Choose port {} for RemoteInterpreterProcess", port); + tSocket = RemoteInterpreterUtils.createTServerSocket(callbackPortRange); + callbackPort = tSocket.getServerSocket().getLocalPort(); callbackHost = RemoteInterpreterUtils.findAvailableHostAddress(); - callbackPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(); } catch (IOException e1) { throw new RuntimeException(e1); } @@ -109,12 +112,12 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess logger.info("Thrift server for callback will start. Port: {}", callbackPort); try { callbackServer = new TThreadPoolServer( - new TThreadPoolServer.Args(new TServerSocket(callbackPort)).processor( + new TThreadPoolServer.Args(tSocket).processor( new RemoteInterpreterCallbackService.Processor<>( new RemoteInterpreterCallbackService.Iface() { @Override public void callback(CallbackInfo callbackInfo) throws TException { - logger.info("Registered: {}", callbackInfo); + logger.info("RemoteInterpreterServer Registered: {}", callbackInfo); host = callbackInfo.getHost(); port = callbackInfo.getPort(); running.set(true); @@ -145,8 +148,6 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess Thread.sleep(500); } logger.debug("callbackServer is serving now"); - } catch (TTransportException e) { - logger.error("callback server error.", e); } catch (InterruptedException e) { logger.warn("", e); } @@ -158,6 +159,8 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess cmdLine.addArgument(callbackHost, false); cmdLine.addArgument("-p", false); cmdLine.addArgument(Integer.toString(callbackPort), false); + cmdLine.addArgument("-r", false); + cmdLine.addArgument(interpreterPortRange, false); if (isUserImpersonate && !userName.equals("anonymous")) { cmdLine.addArgument("-u", false); cmdLine.addArgument(userName, false);