This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new b0e45ef [ZEPPELIN-4443]. Unclear message when R is not installed b0e45ef is described below commit b0e45ef6ab744e3c82728b2240fc0106dad3e0e4 Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Wed Nov 13 13:01:41 2019 +0800 [ZEPPELIN-4443]. Unclear message when R is not installed ### What is this PR for? This PR utilize ProcessLauncher to launch R process and capture the error output when it fails to launch R process. ### What type of PR is it? [Improvement ] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-4443 ### How should this be tested? * Unit test is added ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3520 from zjffdu/ZEPPELIN-4443 and squashes the following commits: 79e51cfee [Jeff Zhang] [ZEPPELIN-4443]. Unclear message when R is not installed --- .../java/org/apache/zeppelin/spark/ZeppelinR.java | 203 ++++++++++----------- .../zeppelin/spark/SparkRInterpreterTest.java | 35 +++- .../zeppelin/spark/BaseSparkScalaInterpreter.scala | 2 +- .../zeppelin/interpreter/util/ProcessLauncher.java | 12 +- 4 files changed, 140 insertions(+), 112 deletions(-) diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java index 60c5b17..00226e9 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java @@ -22,9 +22,7 @@ import org.apache.commons.io.IOUtils; import org.apache.spark.SparkRBackend; import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterOutput; -import org.apache.zeppelin.interpreter.InterpreterOutputListener; -import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput; -import org.apache.zeppelin.interpreter.util.InterpreterOutputStream; +import org.apache.zeppelin.interpreter.util.ProcessLauncher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,30 +34,19 @@ import java.util.Map; /** * R repl interaction */ -public class ZeppelinR implements ExecuteResultHandler { - private static Logger logger = LoggerFactory.getLogger(ZeppelinR.class); +public class ZeppelinR { + private static Logger LOGGER = LoggerFactory.getLogger(ZeppelinR.class); private final SparkRInterpreter sparkRInterpreter; private final String rCmdPath; private final SparkVersion sparkVersion; private final int timeout; - private DefaultExecutor executor; - private InterpreterOutputStream outputStream; - private PipedOutputStream input; + private RProcessLogOutputStream processOutputStream; private final String scriptPath; private final String libPath; - static Map<Integer, ZeppelinR> zeppelinR = Collections.synchronizedMap( - new HashMap<Integer, ZeppelinR>()); - - private InterpreterOutput initialOutput; + static Map<Integer, ZeppelinR> zeppelinR = Collections.synchronizedMap(new HashMap()); private final int port; - private boolean rScriptRunning; - - /** - * To be notified R repl initialization - */ - boolean rScriptInitialized = false; - Integer rScriptInitializeNotifier = new Integer(0); + private RProcessLauncher rProcessLauncher; /** * Request to R repl @@ -67,6 +54,10 @@ public class ZeppelinR implements ExecuteResultHandler { Request rRequestObject = null; Integer rRequestNotifier = new Integer(0); + public void setInterpreterOutput(InterpreterOutput out) { + processOutputStream.setInterpreterOutput(out); + } + /** * Request object * @@ -151,25 +142,22 @@ public class ZeppelinR implements ExecuteResultHandler { cmd.addArgument(SparkRBackend.socketSecret()); } // dump out the R command to facilitate manually running it, e.g. for fault diagnosis purposes - logger.debug("R Command: " + cmd.toString()); - - executor = new DefaultExecutor(); - outputStream = new SparkRInterpreterOutputStream(logger, sparkRInterpreter); - - input = new PipedOutputStream(); - PipedInputStream in = new PipedInputStream(input); - - PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream, outputStream, in); - executor.setWatchdog(new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT)); - executor.setStreamHandler(streamHandler); + LOGGER.info("R Command: " + cmd.toString()); + processOutputStream = new RProcessLogOutputStream(sparkRInterpreter); Map env = EnvironmentUtils.getProcEnvironment(); - - - initialOutput = new InterpreterOutput(null); - outputStream.setInterpreterOutput(initialOutput); - executor.execute(cmd, env, this); - rScriptRunning = true; - + rProcessLauncher = new RProcessLauncher(cmd, env, processOutputStream); + rProcessLauncher.launch(); + rProcessLauncher.waitForReady(30 * 1000); + + if (!rProcessLauncher.isRunning()) { + if (rProcessLauncher.isLaunchTimeout()) { + throw new IOException("Launch r process is time out.\n" + + rProcessLauncher.getErrorMessage()); + } else { + throw new IOException("Fail to launch r process.\n" + + rProcessLauncher.getErrorMessage()); + } + } // flush output eval("cat('')"); } @@ -222,33 +210,31 @@ public class ZeppelinR implements ExecuteResultHandler { } } + private boolean isRProcessInitialized() { + return rProcessLauncher != null && rProcessLauncher.isRunning(); + } + /** * Send request to r repl and return response * @return responseValue */ - private Object request() throws RuntimeException, InterpreterException { - if (!rScriptRunning) { + private Object request() throws RuntimeException { + if (!isRProcessInitialized()) { throw new RuntimeException("r repl is not running"); } - // wait for rscript initialized - if (!rScriptInitialized) { - waitForRScriptInitialized(); - } - rResponseValue = null; - synchronized (rRequestNotifier) { rRequestNotifier.notify(); } Object respValue = null; synchronized (rResponseNotifier) { - while (rResponseValue == null && rScriptRunning) { + while (rResponseValue == null && isRProcessInitialized()) { try { rResponseNotifier.wait(1000); } catch (InterruptedException e) { - logger.error(e.getMessage(), e); + LOGGER.error(e.getMessage(), e); } } respValue = rResponseValue; @@ -263,39 +249,6 @@ public class ZeppelinR implements ExecuteResultHandler { } /** - * Wait until src/main/resources/R/zeppelin_sparkr.R is initialized - * and call onScriptInitialized() - * - * @throws InterpreterException - */ - private void waitForRScriptInitialized() throws InterpreterException { - synchronized (rScriptInitializeNotifier) { - long startTime = System.nanoTime(); - while (rScriptInitialized == false && - rScriptRunning && - System.nanoTime() - startTime < 10L * 1000 * 1000000) { - try { - rScriptInitializeNotifier.wait(1000); - } catch (InterruptedException e) { - logger.error(e.getMessage(), e); - } - } - } - - String errorMessage = ""; - try { - initialOutput.flush(); - errorMessage = new String(initialOutput.toByteArray()); - } catch (IOException e) { - e.printStackTrace(); - } - - if (rScriptInitialized == false) { - throw new InterpreterException("sparkr is not responding " + errorMessage); - } - } - - /** * invoked by src/main/resources/R/zeppelin_sparkr.R * @return */ @@ -305,7 +258,7 @@ public class ZeppelinR implements ExecuteResultHandler { try { rRequestNotifier.wait(1000); } catch (InterruptedException e) { - logger.error(e.getMessage(), e); + LOGGER.error(e.getMessage(), e); } } @@ -332,10 +285,7 @@ public class ZeppelinR implements ExecuteResultHandler { * invoked by src/main/resources/R/zeppelin_sparkr.R */ public void onScriptInitialized() { - synchronized (rScriptInitializeNotifier) { - rScriptInitialized = true; - rScriptInitializeNotifier.notifyAll(); - } + rProcessLauncher.initialized(); } /** @@ -359,14 +309,16 @@ public class ZeppelinR implements ExecuteResultHandler { throw new InterpreterException(e); } - logger.info("File {} created", scriptPath); + LOGGER.info("File {} created", scriptPath); } /** * Terminate this R repl */ public void close() { - executor.getWatchdog().destroyProcess(); + if (rProcessLauncher != null) { + rProcessLauncher.stop(); + } new File(scriptPath).delete(); zeppelinR.remove(hashCode()); } @@ -381,36 +333,58 @@ public class ZeppelinR implements ExecuteResultHandler { return zeppelinR.get(hashcode); } - /** - * Pass InterpreterOutput to capture the repl output - * @param out - */ - public void setInterpreterOutput(InterpreterOutput out) { - outputStream.setInterpreterOutput(out); - } + class RProcessLauncher extends ProcessLauncher { - @Override - public void onProcessComplete(int i) { - logger.info("process complete {}", i); - rScriptRunning = false; - } + public RProcessLauncher(CommandLine commandLine, + Map<String, String> envs, + ProcessLogOutputStream processLogOutput) { + super(commandLine, envs, processLogOutput); + } - @Override - public void onProcessFailed(ExecuteException e) { - logger.error(e.getMessage(), e); - rScriptRunning = false; - } + @Override + public void waitForReady(int timeout) { + long startTime = System.currentTimeMillis(); + synchronized (this) { + while (state == State.LAUNCHED) { + LOGGER.info("Waiting for R process initialized"); + try { + wait(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + if ((System.currentTimeMillis() - startTime) > timeout) { + onTimeout(); + break; + } + } + } + } + public void initialized() { + synchronized (this) { + this.state = State.RUNNING; + notify(); + } + } + } - public static class SparkRInterpreterOutputStream extends InterpreterOutputStream { + public static class RProcessLogOutputStream extends ProcessLauncher.ProcessLogOutputStream { + private InterpreterOutput interpreterOutput; private SparkRInterpreter sparkRInterpreter; - public SparkRInterpreterOutputStream(Logger logger, SparkRInterpreter sparkRInterpreter) { - super(logger); + public RProcessLogOutputStream(SparkRInterpreter sparkRInterpreter) { this.sparkRInterpreter = sparkRInterpreter; } + /** + * Redirect r process output to interpreter output. + * @param interpreterOutput + */ + public void setInterpreterOutput(InterpreterOutput interpreterOutput) { + this.interpreterOutput = interpreterOutput; + } + @Override protected void processLine(String s, int i) { super.processLine(s, i); @@ -418,6 +392,21 @@ public class ZeppelinR implements ExecuteResultHandler { || s.contains("Execution halted")) { // spark 1.x sparkRInterpreter.getRbackendDead().set(true); } + if (interpreterOutput != null) { + try { + interpreterOutput.write(s); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + @Override + public void close() throws IOException { + super.close(); + if (interpreterOutput != null) { + interpreterOutput.close(); + } } } } diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java index 3838053..1aaef8a 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java @@ -17,6 +17,8 @@ package org.apache.zeppelin.spark; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterGroup; @@ -24,11 +26,13 @@ import org.apache.zeppelin.interpreter.InterpreterOutput; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.LazyOpenInterpreter; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient; +import org.apache.zeppelin.python.PythonInterpreter; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.util.HashMap; +import java.util.LinkedList; import java.util.Map; import java.util.Properties; @@ -78,8 +82,6 @@ public class SparkRInterpreterTest { @Test public void testSparkRInterpreter() throws InterpreterException, InterruptedException { - - InterpreterResult result = sparkRInterpreter.interpret("1+1", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); assertTrue(result.message().get(0).getData().contains("2")); @@ -150,6 +152,35 @@ public class SparkRInterpreterTest { assertTrue(result.message().get(0).getData().contains("sparkR backend is dead")); } + @Test + public void testInvalidR() throws InterpreterException { + tearDown(); + + Properties properties = new Properties(); + properties.setProperty("zeppelin.R.cmd", "invalid_r"); + properties.setProperty("spark.master", "local"); + properties.setProperty("spark.app.name", "test"); + + InterpreterGroup interpreterGroup = new InterpreterGroup(); + Interpreter sparkRInterpreter = new LazyOpenInterpreter(new SparkRInterpreter(properties)); + Interpreter sparkInterpreter = new LazyOpenInterpreter(new SparkInterpreter(properties)); + interpreterGroup.addInterpreterToSession(sparkRInterpreter, "session_1"); + interpreterGroup.addInterpreterToSession(sparkInterpreter, "session_1"); + sparkRInterpreter.setInterpreterGroup(interpreterGroup); + sparkInterpreter.setInterpreterGroup(interpreterGroup); + + InterpreterContext context = getInterpreterContext(); + InterpreterContext.set(context); + + try { + sparkRInterpreter.interpret("1+1", getInterpreterContext()); + fail("Should fail to open SparkRInterpreter"); + } catch (InterpreterException e) { + String stacktrace = ExceptionUtils.getStackTrace(e); + assertTrue(stacktrace, stacktrace.contains("No such file or directory")); + } + } + private InterpreterContext getInterpreterContext() { InterpreterContext context = InterpreterContext.builder() .setNoteId("note_1") diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala index 11ae461..8750011 100644 --- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala +++ b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala @@ -321,7 +321,7 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf, z = new SparkZeppelinContext(sc, sparkShims, interpreterGroup.getInterpreterHookRegistry, - properties.getProperty("zeppelin.spark.maxResult").toInt) + properties.getProperty("zeppelin.spark.maxResult", "1000").toInt) bind("z", z.getClass.getCanonicalName, z, List("""@transient""")) } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/ProcessLauncher.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/ProcessLauncher.java index 5e6c8c9..d544211 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/ProcessLauncher.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/ProcessLauncher.java @@ -59,11 +59,19 @@ public abstract class ProcessLauncher implements ExecuteResultHandler { Map<String, String> envs) { this.commandLine = commandLine; this.envs = envs; + this.processOutput = new ProcessLogOutputStream(); + } + + public ProcessLauncher(CommandLine commandLine, + Map<String, String> envs, + ProcessLogOutputStream processLogOutput) { + this.commandLine = commandLine; + this.envs = envs; + this.processOutput = processLogOutput; } public void launch() { DefaultExecutor executor = new DefaultExecutor(); - this.processOutput = new ProcessLogOutputStream(); executor.setStreamHandler(new PumpStreamHandler(processOutput)); this.watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT); executor.setWatchdog(watchdog); @@ -140,7 +148,7 @@ public abstract class ProcessLauncher implements ExecuteResultHandler { processOutput.stopCatchLaunchOutput(); } - class ProcessLogOutputStream extends LogOutputStream { + public static class ProcessLogOutputStream extends LogOutputStream { private boolean catchLaunchOutput = true; private StringBuilder launchOutput = new StringBuilder();