Repository: zeppelin
Updated Branches:
  refs/heads/branch-0.8 48e1c4bba -> 7a8400e96


ZEPPELIN-2515. After 100 minutes R process quits silently and spark.r 
interpreter becomes unresponsive

### What is this PR for?

This PR would 2 features:
1. Make timeout of sparkr backend configurable.
2. Detect R backend dead and display proper message to frontend.

### What type of PR is it?
[Bug Fix]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-2515

### How should this be tested?
* unit test is added

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <zjf...@apache.org>

Closes #2839 from zjffdu/ZEPPELIN-2515 and squashes the following commits:

62888c5 [Jeff Zhang] ZEPPELIN-2515. After 100 minutes R process quits silently 
and spark.r interpreter becomes unresponsive

(cherry picked from commit 8b2259cf5805f654b5da2725fa8207fa0b6d5f7e)
Signed-off-by: Jeff Zhang <zjf...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/7a8400e9
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/7a8400e9
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/7a8400e9

Branch: refs/heads/branch-0.8
Commit: 7a8400e9616327e6be86278bd4b01b112a3ef031
Parents: 48e1c4b
Author: Jeff Zhang <zjf...@apache.org>
Authored: Tue Mar 6 15:02:19 2018 +0800
Committer: Jeff Zhang <zjf...@apache.org>
Committed: Wed Mar 14 20:54:47 2018 +0800

----------------------------------------------------------------------
 .../zeppelin/spark/SparkRInterpreter.java       | 17 +++++++++--
 .../org/apache/zeppelin/spark/ZeppelinR.java    | 32 ++++++++++++++++++--
 .../src/main/resources/R/zeppelin_sparkr.R      |  3 +-
 .../zeppelin/spark/SparkRInterpreterTest.java   |  9 +++++-
 4 files changed, 54 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7a8400e9/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
----------------------------------------------------------------------
diff --git 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
index 5efff0e..44f71b7 100644
--- 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
+++ 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
@@ -27,6 +27,7 @@ import org.apache.spark.SparkRBackend;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.zeppelin.interpreter.*;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
 import org.apache.zeppelin.scheduler.Scheduler;
 import org.apache.zeppelin.scheduler.SchedulerFactory;
 import org.slf4j.Logger;
@@ -36,6 +37,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * R and SparkR interpreter with visualization support.
@@ -46,6 +49,7 @@ public class SparkRInterpreter extends Interpreter {
   private static String renderOptions;
   private SparkInterpreter sparkInterpreter;
   private ZeppelinR zeppelinR;
+  private AtomicBoolean rbackendDead = new AtomicBoolean(false);
   private SparkContext sc;
   private JavaSparkContext jsc;
 
@@ -79,10 +83,11 @@ public class SparkRInterpreter extends Interpreter {
     }
 
     int port = SparkRBackend.port();
-
     this.sparkInterpreter = getSparkInterpreter();
     this.sc = sparkInterpreter.getSparkContext();
     this.jsc = sparkInterpreter.getJavaSparkContext();
+    int timeout = this.sc.getConf().getInt("spark.r.backendConnectionTimeout", 
6000);
+
     SparkVersion sparkVersion = new SparkVersion(sc.version());
     ZeppelinRContext.setSparkContext(sc);
     ZeppelinRContext.setJavaSparkContext(jsc);
@@ -92,7 +97,7 @@ public class SparkRInterpreter extends Interpreter {
     ZeppelinRContext.setSqlContext(sparkInterpreter.getSQLContext());
     ZeppelinRContext.setZeppelinContext(sparkInterpreter.getZeppelinContext());
 
-    zeppelinR = new ZeppelinR(rCmdPath, sparkRLibPath, port, sparkVersion);
+    zeppelinR = new ZeppelinR(rCmdPath, sparkRLibPath, port, sparkVersion, 
timeout, this);
     try {
       zeppelinR.open();
     } catch (IOException e) {
@@ -159,6 +164,10 @@ public class SparkRInterpreter extends Interpreter {
 
     try {
       // render output with knitr
+      if (rbackendDead.get()) {
+        return new InterpreterResult(InterpreterResult.Code.ERROR,
+            "sparkR backend is dead, please try to increase 
spark.r.backendConnectionTimeout");
+      }
       if (useKnitr()) {
         zeppelinR.setInterpreterOutput(null);
         zeppelinR.set(".zcmd", "\n```{r " + renderOptions + "}\n" + lines + 
"\n```");
@@ -253,4 +262,8 @@ public class SparkRInterpreter extends Interpreter {
       return false;
     }
   }
+
+  public AtomicBoolean getRbackendDead() {
+    return rbackendDead;
+  }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7a8400e9/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java
----------------------------------------------------------------------
diff --git 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java
index 130d849..e481dbe 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java
@@ -36,9 +36,12 @@ import java.util.Map;
  * R repl interaction
  */
 public class ZeppelinR implements ExecuteResultHandler {
-  Logger logger = LoggerFactory.getLogger(ZeppelinR.class);
+  private static Logger logger = LoggerFactory.getLogger(ZeppelinR.class);
+
+  private final SparkRInterpreter sparkRInterpreter;
   private final String rCmdPath;
   private final SparkVersion sparkVersion;
+  private final int timeout;
   private DefaultExecutor executor;
   private InterpreterOutputStream outputStream;
   private PipedOutputStream input;
@@ -108,11 +111,13 @@ public class ZeppelinR implements ExecuteResultHandler {
    * @param libPath sparkr library path
    */
   public ZeppelinR(String rCmdPath, String libPath, int sparkRBackendPort,
-      SparkVersion sparkVersion) {
+      SparkVersion sparkVersion, int timeout, SparkRInterpreter 
sparkRInterpreter) {
     this.rCmdPath = rCmdPath;
     this.libPath = libPath;
     this.sparkVersion = sparkVersion;
     this.port = sparkRBackendPort;
+    this.timeout = timeout;
+    this.sparkRInterpreter = sparkRInterpreter;
     try {
       File scriptFile = File.createTempFile("zeppelin_sparkr-", ".R");
       scriptPath = scriptFile.getAbsolutePath();
@@ -140,12 +145,13 @@ public class ZeppelinR implements ExecuteResultHandler {
     cmd.addArgument(Integer.toString(port));
     cmd.addArgument(libPath);
     cmd.addArgument(Integer.toString(sparkVersion.toNumber()));
+    cmd.addArgument(Integer.toString(timeout));
     
     // dump out the R command to facilitate manually running it, e.g. for 
fault diagnosis purposes
     logger.debug(cmd.toString());
 
     executor = new DefaultExecutor();
-    outputStream = new InterpreterOutputStream(logger);
+    outputStream = new SparkRInterpreterOutputStream(logger, 
sparkRInterpreter);
 
     input = new PipedOutputStream();
     PipedInputStream in = new PipedInputStream(input);
@@ -391,4 +397,24 @@ public class ZeppelinR implements ExecuteResultHandler {
     logger.error(e.getMessage(), e);
     rScriptRunning = false;
   }
+
+
+  public static class SparkRInterpreterOutputStream extends 
InterpreterOutputStream {
+
+    private SparkRInterpreter sparkRInterpreter;
+
+    public SparkRInterpreterOutputStream(Logger logger, SparkRInterpreter 
sparkRInterpreter) {
+      super(logger);
+      this.sparkRInterpreter = sparkRInterpreter;
+    }
+
+    @Override
+    protected void processLine(String s, int i) {
+      super.processLine(s, i);
+      if (s.contains("Java SparkR backend might have failed") // spark 2.x
+          || s.contains("Execution halted")) { // spark 1.x
+        sparkRInterpreter.getRbackendDead().set(true);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7a8400e9/spark/interpreter/src/main/resources/R/zeppelin_sparkr.R
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/main/resources/R/zeppelin_sparkr.R 
b/spark/interpreter/src/main/resources/R/zeppelin_sparkr.R
index 525c6c5..16b8415 100644
--- a/spark/interpreter/src/main/resources/R/zeppelin_sparkr.R
+++ b/spark/interpreter/src/main/resources/R/zeppelin_sparkr.R
@@ -22,6 +22,7 @@ hashCode <- as.integer(args[1])
 port <- as.integer(args[2])
 libPath <- args[3]
 version <- as.integer(args[4])
+timeout <- as.integer(args[5])
 rm(args)
 
 print(paste("Port ", toString(port)))
@@ -31,7 +32,7 @@ print(paste("LibPath ", libPath))
 library(SparkR)
 
 
-SparkR:::connectBackend("localhost", port, 6000)
+SparkR:::connectBackend("localhost", port, timeout)
 
 # scStartTime is needed by R/pkg/R/sparkR.R
 assign(".scStartTime", as.integer(Sys.time()), envir = SparkR:::.sparkREnv)

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7a8400e9/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
----------------------------------------------------------------------
diff --git 
a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
 
b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
index 0bd88d4..bcdd876 100644
--- 
a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
+++ 
b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
@@ -47,7 +47,7 @@ public class SparkRInterpreterTest {
   private RemoteEventClient mockRemoteEventClient = 
mock(RemoteEventClient.class);
 
   @Test
-  public void testSparkRInterpreter() throws IOException, 
InterruptedException, InterpreterException {
+  public void testSparkRInterpreter() throws InterpreterException, 
InterruptedException {
     Properties properties = new Properties();
     properties.setProperty("spark.master", "local");
     properties.setProperty("spark.app.name", "test");
@@ -55,6 +55,7 @@ public class SparkRInterpreterTest {
     properties.setProperty("zeppelin.spark.test", "true");
     properties.setProperty("zeppelin.spark.useNew", "true");
     properties.setProperty("zeppelin.R.knitr", "true");
+    properties.setProperty("spark.r.backendConnectionTimeout", "10");
 
     sparkRInterpreter = new SparkRInterpreter(properties);
     sparkInterpreter = new SparkInterpreter(properties);
@@ -91,6 +92,12 @@ public class SparkRInterpreterTest {
       // spark job url is sent
       verify(mockRemoteEventClient, 
atLeastOnce()).onParaInfosReceived(any(String.class), any(String.class), 
any(Map.class));
     }
+
+    // sparkr backend would be timeout after 10 seconds
+    Thread.sleep(15 * 1000);
+    result = sparkRInterpreter.interpret("1+1", getInterpreterContext());
+    assertEquals(InterpreterResult.Code.ERROR, result.code());
+    assertTrue(result.message().get(0).getData().contains("sparkR backend is 
dead"));
   }
 
   private InterpreterContext getInterpreterContext() {

Reply via email to