Repository: incubator-zeppelin Updated Branches: refs/heads/master 1b3784d23 -> 90cc2b3d1
Shell interpreter improvements Creating new PR with the changes from https://github.com/apache/incubator-zeppelin/pull/615 Please check the above PR for prior discussions. ### What is this PR for? *Provide ability to to run shell commands in parallel *Provide ability to cancel shell command *Propagate the error from shell commands to UI ### What type of PR is it? Improvement ### Todos NA ### Is there a relevant Jira issue? No ### How should this be tested? *To check parallelism, run more than 10 shell commands concurrently. *To verify whether error is propagate to UI, execute a shell command which will error out(simplest being cd of a non existent directory ) *To verify the cancel functionality, try cancelling a shell command that is running. ### Screenshots (if appropriate) NA ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: karuppayya <[email protected]> Author: Karup <[email protected]> Closes #666 from Karuppayya/shell_imp and squashes the following commits: 6293781 [karuppayya] Fix test failure, fixes based on discussion 431cc79 [karuppayya] Shell interpreter improvements 825f696 [karuppayya] merge master 4fd2113 [Karup] Send exitvalue of shell command in interpreter result d259c48 [karuppayya] Fix typo, log exit value of a succesful shell commnad 351888d [karuppayya] Increase thread pool size 8cd6fd4 [karuppayya] Add log messages 9eb3eca [karuppayya] Fix command timeout period 87364b1 [karuppayya] Remove unnecessary changes fcdc494 [karuppayya] Fix indentation 30078ac [karuppayya] fix 540bfa8 [Karup] Merge branch 'shell1' of github.com:Karuppayya/incubator-zeppelin into shell1 7d938bd [Karup] fix b0a97a1 [Karup] fix Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/90cc2b3d Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/90cc2b3d Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/90cc2b3d Branch: refs/heads/master Commit: 90cc2b3d1b6902cc56c7231a6802a3baac3ee0d7 Parents: 1b3784d Author: karuppayya <[email protected]> Authored: Sun Jan 24 21:24:16 2016 +0530 Committer: Felix Cheung <[email protected]> Committed: Thu Jan 28 15:14:00 2016 -0800 ---------------------------------------------------------------------- .../apache/zeppelin/shell/ShellInterpreter.java | 59 +++++++++++++++++--- .../interpreter/remote/RemoteInterpreter.java | 9 ++- .../remote/RemoteInterpreterProcess.java | 6 ++ .../remote/RemoteInterpreterServer.java | 6 +- .../zeppelin/conf/ZeppelinConfiguration.java | 1 + .../interpreter/InterpreterFactory.java | 3 +- 6 files changed, 70 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/90cc2b3d/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java ---------------------------------------------------------------------- diff --git a/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java b/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java index 090c0e9..85aafc5 100644 --- a/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java +++ b/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java @@ -19,18 +19,22 @@ package org.apache.zeppelin.shell; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Properties; import org.apache.commons.exec.CommandLine; import org.apache.commons.exec.DefaultExecutor; import org.apache.commons.exec.ExecuteException; import org.apache.commons.exec.ExecuteWatchdog; +import org.apache.commons.exec.Executor; import org.apache.commons.exec.PumpStreamHandler; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.apache.zeppelin.scheduler.Job; import org.apache.zeppelin.scheduler.Scheduler; import org.apache.zeppelin.scheduler.SchedulerFactory; import org.slf4j.Logger; @@ -41,6 +45,7 @@ import org.slf4j.LoggerFactory; */ public class ShellInterpreter extends Interpreter { Logger logger = LoggerFactory.getLogger(ShellInterpreter.class); + private static final String EXECUTOR_KEY = "executor"; int commandTimeOut = 600000; static { @@ -61,31 +66,67 @@ public class ShellInterpreter extends Interpreter { @Override public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) { logger.debug("Run shell command '" + cmd + "'"); - long start = System.currentTimeMillis(); CommandLine cmdLine = CommandLine.parse("bash"); cmdLine.addArgument("-c", false); cmdLine.addArgument(cmd, false); DefaultExecutor executor = new DefaultExecutor(); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - executor.setStreamHandler(new PumpStreamHandler(outputStream)); - + ByteArrayOutputStream errorStream = new ByteArrayOutputStream(); + executor.setStreamHandler(new PumpStreamHandler(outputStream, errorStream)); executor.setWatchdog(new ExecuteWatchdog(commandTimeOut)); + + Job runningJob = getRunningJob(contextInterpreter.getParagraphId()); + Map<String, Object> info = runningJob.info(); + info.put(EXECUTOR_KEY, executor); try { - int exitValue = executor.execute(cmdLine); + int exitVal = executor.execute(cmdLine); + logger.info("Paragraph " + contextInterpreter.getParagraphId() + + "return with exit value: " + exitVal); return new InterpreterResult(InterpreterResult.Code.SUCCESS, outputStream.toString()); } catch (ExecuteException e) { + int exitValue = e.getExitValue(); logger.error("Can not run " + cmd, e); - return new InterpreterResult(Code.ERROR, e.getMessage()); + Code code = Code.ERROR; + String msg = errorStream.toString(); + if (exitValue == 143) { + code = Code.INCOMPLETE; + msg = msg + "Paragraph received a SIGTERM.\n"; + logger.info("The paragraph " + contextInterpreter.getParagraphId() + + " stopped executing: " + msg); + } + msg += "Exitvalue: " + exitValue; + return new InterpreterResult(code, msg); } catch (IOException e) { logger.error("Can not run " + cmd, e); return new InterpreterResult(Code.ERROR, e.getMessage()); } } - @Override - public void cancel(InterpreterContext context) {} + private Job getRunningJob(String paragraphId) { + Job foundJob = null; + Collection<Job> jobsRunning = getScheduler().getJobsRunning(); + for (Job job : jobsRunning) { + if (job.getId().equals(paragraphId)) { + foundJob = job; + } + } + return foundJob; + } @Override + public void cancel(InterpreterContext context) { + Job runningJob = getRunningJob(context.getParagraphId()); + if (runningJob != null) { + Map<String, Object> info = runningJob.info(); + Object object = info.get(EXECUTOR_KEY); + if (object != null) { + Executor executor = (Executor) object; + ExecuteWatchdog watchdog = executor.getWatchdog(); + watchdog.destroyProcess(); + } + } + } + @Override public FormType getFormType() { return FormType.SIMPLE; } @@ -97,8 +138,8 @@ public class ShellInterpreter extends Interpreter { @Override public Scheduler getScheduler() { - return SchedulerFactory.singleton().createOrGetFIFOScheduler( - ShellInterpreter.class.getName() + this.hashCode()); + return SchedulerFactory.singleton().createOrGetParallelScheduler( + ShellInterpreter.class.getName() + this.hashCode(), 10); } @Override http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/90cc2b3d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java index d2a24e8..d8cb223 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java @@ -57,14 +57,15 @@ public class RemoteInterpreter extends Interpreter { FormType formType; boolean initialized; private Map<String, String> env; - private int connectTimeout; + private int maxPoolSize; public RemoteInterpreter(Properties property, String className, String interpreterRunner, String interpreterPath, int connectTimeout, + int maxPoolSize, RemoteInterpreterProcessListener remoteInterpreterProcessListener) { super(property); this.className = className; @@ -73,6 +74,7 @@ public class RemoteInterpreter extends Interpreter { this.interpreterPath = interpreterPath; env = new HashMap<String, String>(); this.connectTimeout = connectTimeout; + this.maxPoolSize = maxPoolSize; this.remoteInterpreterProcessListener = remoteInterpreterProcessListener; } @@ -89,6 +91,7 @@ public class RemoteInterpreter extends Interpreter { this.interpreterPath = interpreterPath; this.env = env; this.connectTimeout = connectTimeout; + this.maxPoolSize = 10; this.remoteInterpreterProcessListener = remoteInterpreterProcessListener; } @@ -124,7 +127,7 @@ public class RemoteInterpreter extends Interpreter { RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); int rc = interpreterProcess.reference(getInterpreterGroup()); - + interpreterProcess.setMaxPoolSize(this.maxPoolSize); synchronized (interpreterProcess) { // when first process created if (rc == 1) { @@ -330,7 +333,7 @@ public class RemoteInterpreter extends Interpreter { @Override public Scheduler getScheduler() { - int maxConcurrency = 10; + int maxConcurrency = maxPoolSize; RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); if (interpreterProcess == null) { return null; http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/90cc2b3d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java index 5612a2b..5237b0b 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java @@ -261,6 +261,12 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler { } } + public void setMaxPoolSize(int size) { + if (clientPool != null) { + //Size + 2 for progress poller , cancel operation + clientPool.setMaxTotal(size + 2); + } + } /** * Called when angular object is updated in client side to propagate * change to the remote process http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/90cc2b3d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index a59293b..02736fe 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -263,6 +263,7 @@ public class RemoteInterpreterServer private Interpreter interpreter; private String script; private InterpreterContext context; + private Map<String, Object> infos; public InterpretJob( String jobId, @@ -285,7 +286,10 @@ public class RemoteInterpreterServer @Override public Map<String, Object> info() { - return null; + if (infos == null) { + infos = new HashMap<>(); + } + return infos; } @Override http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/90cc2b3d/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index 6d726f5..9e606ee 100755 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -457,6 +457,7 @@ public class ZeppelinConfiguration extends XMLConfiguration { + "org.apache.zeppelin.jdbc.JDBCInterpreter"), ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"), ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT("zeppelin.interpreter.connect.timeout", 30000), + ZEPPELIN_INTERPRETER_MAX_POOL_SIZE("zeppelin.interpreter.max.poolsize", 10), ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"), ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"), // use specified notebook (id) as homescreen http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/90cc2b3d/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java index 039d970..3cd1257 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java @@ -664,9 +664,10 @@ public class InterpreterFactory { Properties property) { int connectTimeout = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT); + int maxPoolSize = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE); LazyOpenInterpreter intp = new LazyOpenInterpreter(new RemoteInterpreter( property, className, conf.getInterpreterRemoteRunnerPath(), - interpreterPath, connectTimeout, remoteInterpreterProcessListener)); + interpreterPath, connectTimeout, maxPoolSize, remoteInterpreterProcessListener)); return intp; }
