Repository: zeppelin Updated Branches: refs/heads/branch-0.8 48e1c4bba -> 7a8400e96
ZEPPELIN-2515. After 100 minutes R process quits silently and spark.r interpreter becomes unresponsive ### What is this PR for? This PR would 2 features: 1. Make timeout of sparkr backend configurable. 2. Detect R backend dead and display proper message to frontend. ### What type of PR is it? [Bug Fix] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-2515 ### How should this be tested? * unit test is added ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #2839 from zjffdu/ZEPPELIN-2515 and squashes the following commits: 62888c5 [Jeff Zhang] ZEPPELIN-2515. After 100 minutes R process quits silently and spark.r interpreter becomes unresponsive (cherry picked from commit 8b2259cf5805f654b5da2725fa8207fa0b6d5f7e) Signed-off-by: Jeff Zhang <zjf...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/7a8400e9 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/7a8400e9 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/7a8400e9 Branch: refs/heads/branch-0.8 Commit: 7a8400e9616327e6be86278bd4b01b112a3ef031 Parents: 48e1c4b Author: Jeff Zhang <zjf...@apache.org> Authored: Tue Mar 6 15:02:19 2018 +0800 Committer: Jeff Zhang <zjf...@apache.org> Committed: Wed Mar 14 20:54:47 2018 +0800 ---------------------------------------------------------------------- .../zeppelin/spark/SparkRInterpreter.java | 17 +++++++++-- .../org/apache/zeppelin/spark/ZeppelinR.java | 32 ++++++++++++++++++-- .../src/main/resources/R/zeppelin_sparkr.R | 3 +- .../zeppelin/spark/SparkRInterpreterTest.java | 9 +++++- 4 files changed, 54 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7a8400e9/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java index 5efff0e..44f71b7 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java @@ -27,6 +27,7 @@ import org.apache.spark.SparkRBackend; import org.apache.spark.api.java.JavaSparkContext; import org.apache.zeppelin.interpreter.*; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; +import org.apache.zeppelin.interpreter.util.InterpreterOutputStream; import org.apache.zeppelin.scheduler.Scheduler; import org.apache.zeppelin.scheduler.SchedulerFactory; import org.slf4j.Logger; @@ -36,6 +37,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; /** * R and SparkR interpreter with visualization support. @@ -46,6 +49,7 @@ public class SparkRInterpreter extends Interpreter { private static String renderOptions; private SparkInterpreter sparkInterpreter; private ZeppelinR zeppelinR; + private AtomicBoolean rbackendDead = new AtomicBoolean(false); private SparkContext sc; private JavaSparkContext jsc; @@ -79,10 +83,11 @@ public class SparkRInterpreter extends Interpreter { } int port = SparkRBackend.port(); - this.sparkInterpreter = getSparkInterpreter(); this.sc = sparkInterpreter.getSparkContext(); this.jsc = sparkInterpreter.getJavaSparkContext(); + int timeout = this.sc.getConf().getInt("spark.r.backendConnectionTimeout", 6000); + SparkVersion sparkVersion = new SparkVersion(sc.version()); ZeppelinRContext.setSparkContext(sc); ZeppelinRContext.setJavaSparkContext(jsc); @@ -92,7 +97,7 @@ public class SparkRInterpreter extends Interpreter { ZeppelinRContext.setSqlContext(sparkInterpreter.getSQLContext()); ZeppelinRContext.setZeppelinContext(sparkInterpreter.getZeppelinContext()); - zeppelinR = new ZeppelinR(rCmdPath, sparkRLibPath, port, sparkVersion); + zeppelinR = new ZeppelinR(rCmdPath, sparkRLibPath, port, sparkVersion, timeout, this); try { zeppelinR.open(); } catch (IOException e) { @@ -159,6 +164,10 @@ public class SparkRInterpreter extends Interpreter { try { // render output with knitr + if (rbackendDead.get()) { + return new InterpreterResult(InterpreterResult.Code.ERROR, + "sparkR backend is dead, please try to increase spark.r.backendConnectionTimeout"); + } if (useKnitr()) { zeppelinR.setInterpreterOutput(null); zeppelinR.set(".zcmd", "\n```{r " + renderOptions + "}\n" + lines + "\n```"); @@ -253,4 +262,8 @@ public class SparkRInterpreter extends Interpreter { return false; } } + + public AtomicBoolean getRbackendDead() { + return rbackendDead; + } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7a8400e9/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java index 130d849..e481dbe 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java @@ -36,9 +36,12 @@ import java.util.Map; * R repl interaction */ public class ZeppelinR implements ExecuteResultHandler { - Logger logger = LoggerFactory.getLogger(ZeppelinR.class); + private static Logger logger = LoggerFactory.getLogger(ZeppelinR.class); + + private final SparkRInterpreter sparkRInterpreter; private final String rCmdPath; private final SparkVersion sparkVersion; + private final int timeout; private DefaultExecutor executor; private InterpreterOutputStream outputStream; private PipedOutputStream input; @@ -108,11 +111,13 @@ public class ZeppelinR implements ExecuteResultHandler { * @param libPath sparkr library path */ public ZeppelinR(String rCmdPath, String libPath, int sparkRBackendPort, - SparkVersion sparkVersion) { + SparkVersion sparkVersion, int timeout, SparkRInterpreter sparkRInterpreter) { this.rCmdPath = rCmdPath; this.libPath = libPath; this.sparkVersion = sparkVersion; this.port = sparkRBackendPort; + this.timeout = timeout; + this.sparkRInterpreter = sparkRInterpreter; try { File scriptFile = File.createTempFile("zeppelin_sparkr-", ".R"); scriptPath = scriptFile.getAbsolutePath(); @@ -140,12 +145,13 @@ public class ZeppelinR implements ExecuteResultHandler { cmd.addArgument(Integer.toString(port)); cmd.addArgument(libPath); cmd.addArgument(Integer.toString(sparkVersion.toNumber())); + cmd.addArgument(Integer.toString(timeout)); // dump out the R command to facilitate manually running it, e.g. for fault diagnosis purposes logger.debug(cmd.toString()); executor = new DefaultExecutor(); - outputStream = new InterpreterOutputStream(logger); + outputStream = new SparkRInterpreterOutputStream(logger, sparkRInterpreter); input = new PipedOutputStream(); PipedInputStream in = new PipedInputStream(input); @@ -391,4 +397,24 @@ public class ZeppelinR implements ExecuteResultHandler { logger.error(e.getMessage(), e); rScriptRunning = false; } + + + public static class SparkRInterpreterOutputStream extends InterpreterOutputStream { + + private SparkRInterpreter sparkRInterpreter; + + public SparkRInterpreterOutputStream(Logger logger, SparkRInterpreter sparkRInterpreter) { + super(logger); + this.sparkRInterpreter = sparkRInterpreter; + } + + @Override + protected void processLine(String s, int i) { + super.processLine(s, i); + if (s.contains("Java SparkR backend might have failed") // spark 2.x + || s.contains("Execution halted")) { // spark 1.x + sparkRInterpreter.getRbackendDead().set(true); + } + } + } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7a8400e9/spark/interpreter/src/main/resources/R/zeppelin_sparkr.R ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/main/resources/R/zeppelin_sparkr.R b/spark/interpreter/src/main/resources/R/zeppelin_sparkr.R index 525c6c5..16b8415 100644 --- a/spark/interpreter/src/main/resources/R/zeppelin_sparkr.R +++ b/spark/interpreter/src/main/resources/R/zeppelin_sparkr.R @@ -22,6 +22,7 @@ hashCode <- as.integer(args[1]) port <- as.integer(args[2]) libPath <- args[3] version <- as.integer(args[4]) +timeout <- as.integer(args[5]) rm(args) print(paste("Port ", toString(port))) @@ -31,7 +32,7 @@ print(paste("LibPath ", libPath)) library(SparkR) -SparkR:::connectBackend("localhost", port, 6000) +SparkR:::connectBackend("localhost", port, timeout) # scStartTime is needed by R/pkg/R/sparkR.R assign(".scStartTime", as.integer(Sys.time()), envir = SparkR:::.sparkREnv) http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7a8400e9/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java index 0bd88d4..bcdd876 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java @@ -47,7 +47,7 @@ public class SparkRInterpreterTest { private RemoteEventClient mockRemoteEventClient = mock(RemoteEventClient.class); @Test - public void testSparkRInterpreter() throws IOException, InterruptedException, InterpreterException { + public void testSparkRInterpreter() throws InterpreterException, InterruptedException { Properties properties = new Properties(); properties.setProperty("spark.master", "local"); properties.setProperty("spark.app.name", "test"); @@ -55,6 +55,7 @@ public class SparkRInterpreterTest { properties.setProperty("zeppelin.spark.test", "true"); properties.setProperty("zeppelin.spark.useNew", "true"); properties.setProperty("zeppelin.R.knitr", "true"); + properties.setProperty("spark.r.backendConnectionTimeout", "10"); sparkRInterpreter = new SparkRInterpreter(properties); sparkInterpreter = new SparkInterpreter(properties); @@ -91,6 +92,12 @@ public class SparkRInterpreterTest { // spark job url is sent verify(mockRemoteEventClient, atLeastOnce()).onParaInfosReceived(any(String.class), any(String.class), any(Map.class)); } + + // sparkr backend would be timeout after 10 seconds + Thread.sleep(15 * 1000); + result = sparkRInterpreter.interpret("1+1", getInterpreterContext()); + assertEquals(InterpreterResult.Code.ERROR, result.code()); + assertTrue(result.message().get(0).getData().contains("sparkR backend is dead")); } private InterpreterContext getInterpreterContext() {