Repository: zeppelin Updated Branches: refs/heads/branch-0.7 50b7d235f -> 5e7d2b472
ZEPPELIN-3040. Allow to specify portRange for interpreter process thrift service ### What is this PR for? This PR is trying to add new configuration zeppelin.interpreter.portRange which control the portRange of interpreter process. This is required by some users for security reason. ### What type of PR is it? [Improvement | Feature] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-3040 ### How should this be tested? Manually test. Set zeppelin.interpreter.portRange and launch python interpreter, verify it is in the proper portRange. ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #2666 from zjffdu/ZEPPELIN-3040-0.7 and squashes the following commits: cd1ca62 [Jeff Zhang] ZEPPELIN-3040. Allow to specify portRange for interpreter process thrift service Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/5e7d2b47 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/5e7d2b47 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/5e7d2b47 Branch: refs/heads/branch-0.7 Commit: 5e7d2b472cdfe2d215fb528559c7484a3abd455f Parents: 50b7d23 Author: Jeff Zhang <zjf...@apache.org> Authored: Fri Nov 10 16:05:23 2017 +0800 Committer: Jeff Zhang <zjf...@apache.org> Committed: Mon Nov 13 15:32:40 2017 +0800 ---------------------------------------------------------------------- .../interpreter/remote/RemoteInterpreter.java | 6 ++- .../remote/RemoteInterpreterManagedProcess.java | 7 +++- .../remote/RemoteInterpreterUtils.java | 42 ++++++++++++++++++++ .../remote/RemoteInterpreterProcessTest.java | 7 ++-- .../zeppelin/conf/ZeppelinConfiguration.java | 7 +++- .../interpreter/InterpreterFactory.java | 3 +- 6 files changed, 64 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5e7d2b47/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java index a9c86c4..e8b2ebd 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java @@ -61,6 +61,7 @@ public class RemoteInterpreter extends Interpreter { private int maxPoolSize; private String host; private int port; + private String portRange; private String userName; private Boolean isUserImpersonate; private int outputLimit = Constants.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT; @@ -72,7 +73,7 @@ public class RemoteInterpreter extends Interpreter { String interpreterRunner, String interpreterPath, String localRepoPath, int connectTimeout, int maxPoolSize, RemoteInterpreterProcessListener remoteInterpreterProcessListener, ApplicationEventListener appListener, String userName, Boolean isUserImpersonate, - int outputLimit) { + int outputLimit, String portRange) { super(property); this.sessionKey = sessionKey; this.className = className; @@ -88,6 +89,7 @@ public class RemoteInterpreter extends Interpreter { this.userName = userName; this.isUserImpersonate = isUserImpersonate; this.outputLimit = outputLimit; + this.portRange = portRange; } @@ -184,7 +186,7 @@ public class RemoteInterpreter extends Interpreter { } else { // create new remote process remoteProcess = new RemoteInterpreterManagedProcess( - interpreterRunner, interpreterPath, localRepoPath, env, connectTimeout, + interpreterRunner, interpreterPath, localRepoPath, portRange, env, connectTimeout, remoteInterpreterProcessListener, applicationEventListener); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5e7d2b47/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java index f5d73ed..d638f37 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java @@ -42,6 +42,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess private ExecuteWatchdog watchdog; boolean running = false; private int port = -1; + private String portRange; private final String interpreterDir; private final String localRepoDir; @@ -51,6 +52,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess String intpRunner, String intpDir, String localRepoDir, + String portRange, Map<String, String> env, int connectTimeout, RemoteInterpreterProcessListener listener, @@ -61,12 +63,14 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess this.env = env; this.interpreterDir = intpDir; this.localRepoDir = localRepoDir; + this.portRange = portRange; } RemoteInterpreterManagedProcess(String intpRunner, String intpDir, String localRepoDir, + String portRange, Map<String, String> env, RemoteInterpreterEventPoller remoteInterpreterEventPoller, int connectTimeout) { @@ -76,6 +80,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess this.env = env; this.interpreterDir = intpDir; this.localRepoDir = localRepoDir; + this.portRange = portRange; } @Override @@ -92,7 +97,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess public void start(String userName, Boolean isUserImpersonate) { // start server process try { - port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(); + port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(portRange); } catch (IOException e1) { throw new InterpreterException(e1); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5e7d2b47/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java index 2937e2d..bdf98ff 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java @@ -17,6 +17,7 @@ package org.apache.zeppelin.interpreter.remote; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,6 +41,47 @@ public class RemoteInterpreterUtils { return port; } + /** + * start:end + * + * @param portRange + * @return + * @throws IOException + */ + public static int findRandomAvailablePortOnAllLocalInterfaces(String portRange) + throws IOException { + + // ':' is the default value which means no constraints on the portRange + if (StringUtils.isBlank(portRange) || portRange.equals(":")) { + int port; + try (ServerSocket socket = new ServerSocket(0);) { + port = socket.getLocalPort(); + socket.close(); + } + return port; + } + // valid user registered port https://en.wikipedia.org/wiki/Registered_port + int start = 1024; + int end = 65535; + String[] ports = portRange.split(":", -1); + if (!ports[0].isEmpty()) { + start = Integer.parseInt(ports[0]); + } + if (!ports[1].isEmpty()) { + end = Integer.parseInt(ports[1]); + } + for (int i = start; i <= end; ++i) { + try { + ServerSocket socket = new ServerSocket(i); + socket.close(); + return socket.getLocalPort(); + } catch (Exception e) { + // ignore this + } + } + throw new IOException("No available port in the portRange: " + portRange); + } + public static boolean checkIfRemoteEndpointAccessible(String host, int port) { try { Socket discover = new Socket(); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5e7d2b47/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java index 39a17ae..150f6a9 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java @@ -43,7 +43,7 @@ public class RemoteInterpreterProcessTest { public void testStartStop() { InterpreterGroup intpGroup = new InterpreterGroup(); RemoteInterpreterManagedProcess rip = new RemoteInterpreterManagedProcess( - INTERPRETER_SCRIPT, "nonexists", "fakeRepo", new HashMap<String, String>(), + INTERPRETER_SCRIPT, "nonexists", "fakeRepo", ":", new HashMap<String, String>(), 10 * 1000, null, null); assertFalse(rip.isRunning()); assertEquals(0, rip.referenceCount()); @@ -60,7 +60,7 @@ public class RemoteInterpreterProcessTest { public void testClientFactory() throws Exception { InterpreterGroup intpGroup = new InterpreterGroup(); RemoteInterpreterManagedProcess rip = new RemoteInterpreterManagedProcess( - INTERPRETER_SCRIPT, "nonexists", "fakeRepo", new HashMap<String, String>(), + INTERPRETER_SCRIPT, "nonexists", "fakeRepo", ":", new HashMap<String, String>(), mock(RemoteInterpreterEventPoller.class), 10 * 1000); rip.reference(intpGroup, "anonymous", false); assertEquals(0, rip.getNumActiveClient()); @@ -102,6 +102,7 @@ public class RemoteInterpreterProcessTest { INTERPRETER_SCRIPT, "nonexists", "fakeRepo", + ":", new HashMap<String, String>(), mock(RemoteInterpreterEventPoller.class) , 10 * 1000); @@ -116,7 +117,7 @@ public class RemoteInterpreterProcessTest { public void testPropagateError() throws TException, InterruptedException { InterpreterGroup intpGroup = new InterpreterGroup(); RemoteInterpreterManagedProcess rip = new RemoteInterpreterManagedProcess( - "echo hello_world", "nonexists", "fakeRepo", new HashMap<String, String>(), + "echo hello_world", "nonexists", "fakeRepo", ":", new HashMap<String, String>(), 10 * 1000, null, null); assertFalse(rip.isRunning()); assertEquals(0, rip.referenceCount()); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5e7d2b47/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index 242a890..d2df6d8 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -492,6 +492,9 @@ public class ZeppelinConfiguration extends XMLConfiguration { return getString(ConfVars.ZEPPELIN_SERVER_STRICT_TRANSPORT); } + public String getInterpreterPortRange() { + return getString(ConfVars.ZEPPELIN_INTERPRETER_PORTRANGE); + } public Map<String, String> dumpConfigurations(ZeppelinConfiguration conf, ConfigurationKeyPredicate predicate) { @@ -639,7 +642,9 @@ public class ZeppelinConfiguration extends XMLConfiguration { ZEPPELIN_SERVER_XFRAME_OPTIONS("zeppelin.server.xframe.options", "SAMEORIGIN"), ZEPPELIN_SERVER_JETTY_NAME("zeppelin.server.jetty.name", null), ZEPPELIN_SERVER_STRICT_TRANSPORT("zeppelin.server.strict.transport", "max-age=631138519"), - ZEPPELIN_SERVER_X_XSS_PROTECTION("zeppelin.server.xxss.protection", "1"); + ZEPPELIN_SERVER_X_XSS_PROTECTION("zeppelin.server.xxss.protection", "1"), + + ZEPPELIN_INTERPRETER_PORTRANGE("zeppelin.interpreter.portRange", ":"); private String varName; @SuppressWarnings("rawtypes") http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5e7d2b47/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java index 2091dfd..accbfcf 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java @@ -316,7 +316,8 @@ public class InterpreterFactory implements InterpreterGroupFactory { new RemoteInterpreter(property, interpreterSessionKey, className, interpreterRunnerPath, interpreterPath, localRepoPath, connectTimeout, maxPoolSize, remoteInterpreterProcessListener, appEventListener, userName, isUserImpersonate, - conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT)); + conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT), + conf.getInterpreterPortRange()); remoteInterpreter.addEnv(env); return new LazyOpenInterpreter(remoteInterpreter);