Repository: zeppelin
Updated Branches:
  refs/heads/master 5499fc4e9 -> e30fe73e9


ZEPPELIN-3337. Add more test to SparkRInterpreter

### What is this PR for?
Add more test for SparkRInterpreter, and also some code refactoring in 
SparkRInterpreter.  Also fix one bug of SparkRInterpreter that it can be 
cancelled.

### What type of PR is it?
[ Improvement | Refactoring]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-3337

### How should this be tested?
* CI pass

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <zjf...@apache.org>

Closes #2871 from zjffdu/ZEPPELIN-3337 and squashes the following commits:

6cd91d5 [Jeff Zhang] ZEPPELIN-3337. Add more test to SparkRInterpreter


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/e30fe73e
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/e30fe73e
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/e30fe73e

Branch: refs/heads/master
Commit: e30fe73e9a3bcd9cb14f02915883761894ceb2e4
Parents: 5499fc4
Author: Jeff Zhang <zjf...@apache.org>
Authored: Thu Mar 15 12:38:01 2018 +0800
Committer: Jeff Zhang <zjf...@apache.org>
Committed: Thu Mar 15 22:23:55 2018 +0800

----------------------------------------------------------------------
 .../zeppelin/spark/SparkRInterpreter.java       | 44 +++++----------
 .../zeppelin/spark/SparkRInterpreterTest.java   | 59 +++++++++++++++-----
 testing/install_external_dependencies.sh        |  1 +
 .../interpreter/InterpreterContext.java         | 11 +++-
 .../interpreter/SparkIntegrationTest.java       |  2 +-
 5 files changed, 70 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e30fe73e/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
----------------------------------------------------------------------
diff --git 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
index 44f71b7..896f3a1 100644
--- 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
+++ 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
@@ -46,8 +46,9 @@ import java.util.concurrent.atomic.AtomicInteger;
 public class SparkRInterpreter extends Interpreter {
   private static final Logger logger = 
LoggerFactory.getLogger(SparkRInterpreter.class);
 
-  private static String renderOptions;
+  private String renderOptions;
   private SparkInterpreter sparkInterpreter;
+  private boolean isSpark2;
   private ZeppelinR zeppelinR;
   private AtomicBoolean rbackendDead = new AtomicBoolean(false);
   private SparkContext sc;
@@ -75,6 +76,7 @@ public class SparkRInterpreter extends Interpreter {
       sparkRLibPath = "sparkr";
     }
 
+    // Share the same SparkRBackend across sessions
     synchronized (SparkRBackend.backend()) {
       if (!SparkRBackend.isStarted()) {
         SparkRBackend.init();
@@ -86,12 +88,13 @@ public class SparkRInterpreter extends Interpreter {
     this.sparkInterpreter = getSparkInterpreter();
     this.sc = sparkInterpreter.getSparkContext();
     this.jsc = sparkInterpreter.getJavaSparkContext();
+    SparkVersion sparkVersion = new SparkVersion(sc.version());
+    this.isSpark2 = sparkVersion.newerThanEquals(SparkVersion.SPARK_2_0_0);
     int timeout = this.sc.getConf().getInt("spark.r.backendConnectionTimeout", 
6000);
 
-    SparkVersion sparkVersion = new SparkVersion(sc.version());
     ZeppelinRContext.setSparkContext(sc);
     ZeppelinRContext.setJavaSparkContext(jsc);
-    if (Utils.isSpark2()) {
+    if (isSpark2) {
       ZeppelinRContext.setSparkSession(sparkInterpreter.getSparkSession());
     }
     ZeppelinRContext.setSqlContext(sparkInterpreter.getSQLContext());
@@ -101,37 +104,28 @@ public class SparkRInterpreter extends Interpreter {
     try {
       zeppelinR.open();
     } catch (IOException e) {
-      logger.error("Exception while opening SparkRInterpreter", e);
-      throw new InterpreterException(e);
+      throw new InterpreterException("Exception while opening 
SparkRInterpreter", e);
     }
 
     if (useKnitr()) {
       zeppelinR.eval("library('knitr')");
     }
-    renderOptions = getProperty("zeppelin.R.render.options");
-  }
-
-  String getJobGroup(InterpreterContext context){
-    return "zeppelin-" + context.getParagraphId();
+    renderOptions = getProperty("zeppelin.R.render.options",
+        "out.format = 'html', comment = NA, echo = FALSE, results = 'asis', 
message = F, " +
+            "warning = F, fig.retina = 2");
   }
 
   @Override
   public InterpreterResult interpret(String lines, InterpreterContext 
interpreterContext)
       throws InterpreterException {
 
-    SparkInterpreter sparkInterpreter = getSparkInterpreter();
     sparkInterpreter.populateSparkWebUrl(interpreterContext);
-    if (sparkInterpreter.isUnsupportedSparkVersion()) {
-      return new InterpreterResult(InterpreterResult.Code.ERROR, "Spark "
-          + sparkInterpreter.getSparkVersion().toString() + " is not 
supported");
-    }
-
     String jobGroup = Utils.buildJobGroupId(interpreterContext);
     String jobDesc = "Started by: " +
        Utils.getUserName(interpreterContext.getAuthenticationInfo());
     sparkInterpreter.getSparkContext().setJobGroup(jobGroup, jobDesc, false);
 
-    String imageWidth = getProperty("zeppelin.R.image.width");
+    String imageWidth = getProperty("zeppelin.R.image.width", "100%");
 
     String[] sl = lines.split("\n");
     if (sl[0].contains("{") && sl[0].contains("}")) {
@@ -152,14 +146,13 @@ public class SparkRInterpreter extends Interpreter {
 
     String setJobGroup = "";
     // assign setJobGroup to dummy__, otherwise it would print NULL for this 
statement
-    if (Utils.isSpark2()) {
+    if (isSpark2) {
       setJobGroup = "dummy__ <- setJobGroup(\"" + jobGroup +
           "\", \" +" + jobDesc + "\", TRUE)";
     } else if 
(getSparkInterpreter().getSparkVersion().newerThanEquals(SparkVersion.SPARK_1_5_0))
 {
       setJobGroup = "dummy__ <- setJobGroup(sc, \"" + jobGroup +
           "\", \"" + jobDesc + "\", TRUE)";
     }
-    logger.debug("set JobGroup:" + setJobGroup);
     lines = setJobGroup + "\n" + lines;
 
     try {
@@ -190,11 +183,6 @@ public class SparkRInterpreter extends Interpreter {
     } catch (Exception e) {
       logger.error("Exception while connecting to R", e);
       return new InterpreterResult(InterpreterResult.Code.ERROR, 
e.getMessage());
-    } finally {
-      try {
-      } catch (Exception e) {
-        // Do nothing...
-      }
     }
   }
 
@@ -206,7 +194,7 @@ public class SparkRInterpreter extends Interpreter {
   @Override
   public void cancel(InterpreterContext context) {
     if (this.sc != null) {
-      sc.cancelJobGroup(getJobGroup(context));
+      sc.cancelJobGroup(Utils.buildJobGroupId(context));
     }
   }
 
@@ -256,11 +244,7 @@ public class SparkRInterpreter extends Interpreter {
   }
 
   private boolean useKnitr() {
-    try {
-      return Boolean.parseBoolean(getProperty("zeppelin.R.knitr"));
-    } catch (Exception e) {
-      return false;
-    }
+    return Boolean.parseBoolean(getProperty("zeppelin.R.knitr", "true"));
   }
 
   public AtomicBoolean getRbackendDead() {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e30fe73e/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
----------------------------------------------------------------------
diff --git 
a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
 
b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
index bcdd876..53f29c3 100644
--- 
a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
+++ 
b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
@@ -35,6 +35,7 @@ import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.mock;
@@ -84,6 +85,30 @@ public class SparkRInterpreterTest {
       assertTrue(result.message().get(0).getData().contains("eruptions 
waiting"));
       // spark job url is sent
       verify(mockRemoteEventClient, 
atLeastOnce()).onParaInfosReceived(any(String.class), any(String.class), 
any(Map.class));
+
+      // cancel
+      final InterpreterContext context = getInterpreterContext();
+      Thread thread = new Thread() {
+        @Override
+        public void run() {
+          try {
+            InterpreterResult result = sparkRInterpreter.interpret("ldf <- 
dapplyCollect(\n" +
+                "         df,\n" +
+                "         function(x) {\n" +
+                "           Sys.sleep(3)\n" +
+                "           x <- cbind(x, \"waiting_secs\" = x$waiting * 
60)\n" +
+                "         })\n" +
+                "head(ldf, 3)", context);
+            
assertTrue(result.message().get(0).getData().contains("cancelled"));
+          } catch (InterpreterException e) {
+            fail("Should not throw InterpreterException");
+          }
+        }
+      };
+      thread.setName("Cancel-Thread");
+      thread.start();
+      Thread.sleep(1000);
+      sparkRInterpreter.cancel(context);
     } else {
       // spark 1.x
       result = sparkRInterpreter.interpret("df <- createDataFrame(sqlContext, 
faithful)\nhead(df)", getInterpreterContext());
@@ -93,6 +118,20 @@ public class SparkRInterpreterTest {
       verify(mockRemoteEventClient, 
atLeastOnce()).onParaInfosReceived(any(String.class), any(String.class), 
any(Map.class));
     }
 
+    // plotting
+    result = sparkRInterpreter.interpret("hist(mtcars$mpg)", 
getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(1, result.message().size());
+    assertEquals(InterpreterResult.Type.HTML, 
result.message().get(0).getType());
+    assertTrue(result.message().get(0).getData().contains("<img src="));
+
+    result = sparkRInterpreter.interpret("library(ggplot2)\n" +
+        "ggplot(diamonds, aes(x=carat, y=price, color=cut)) + geom_point()", 
getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(1, result.message().size());
+    assertEquals(InterpreterResult.Type.HTML, 
result.message().get(0).getType());
+    assertTrue(result.message().get(0).getData().contains("<img src="));
+
     // sparkr backend would be timeout after 10 seconds
     Thread.sleep(15 * 1000);
     result = sparkRInterpreter.interpret("1+1", getInterpreterContext());
@@ -101,21 +140,11 @@ public class SparkRInterpreterTest {
   }
 
   private InterpreterContext getInterpreterContext() {
-    InterpreterContext context = new InterpreterContext(
-        "noteId",
-        "paragraphId",
-        "replName",
-        "paragraphTitle",
-        "paragraphText",
-        new AuthenticationInfo(),
-        new HashMap<String, Object>(),
-        new GUI(),
-        new GUI(),
-        new AngularObjectRegistry("spark", null),
-        null,
-        null,
-        null);
-    context.setClient(mockRemoteEventClient);
+    InterpreterContext context = InterpreterContext.builder()
+        .setNoteId("note_1")
+        .setParagraphId("paragraph_1")
+        .setEventClient(mockRemoteEventClient)
+        .build();
     return context;
   }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e30fe73e/testing/install_external_dependencies.sh
----------------------------------------------------------------------
diff --git a/testing/install_external_dependencies.sh 
b/testing/install_external_dependencies.sh
index a120d61..d0b0f63 100755
--- a/testing/install_external_dependencies.sh
+++ b/testing/install_external_dependencies.sh
@@ -30,6 +30,7 @@ if [[ "${SPARKR}" = "true" ]] ; then
     R -e "install.packages('evaluate', repos = 'http://cran.us.r-project.org', 
lib='~/R')"  > /dev/null 2>&1
     R -e "install.packages('base64enc', repos = 
'http://cran.us.r-project.org', lib='~/R')"  > /dev/null 2>&1
     R -e "install.packages('knitr', repos = 'http://cran.us.r-project.org', 
lib='~/R')"  > /dev/null 2>&1
+    R -e "install.packages('ggplot2', repos = 'http://cran.us.r-project.org', 
lib='~/R')"  > /dev/null 2>&1
   fi
 fi
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e30fe73e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
index 6157d69..6ff90a3 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
@@ -83,11 +83,20 @@ public class InterpreterContext {
       return this;
     }
 
-    public InterpreterContext getContext() {
+    public Builder setEventClient(RemoteEventClientWrapper client) {
+      context.client = client;
+      return this;
+    }
+
+    public InterpreterContext build() {
       return context;
     }
   }
 
+  public static Builder builder() {
+    return new Builder();
+  }
+
   private InterpreterContext() {
 
   }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e30fe73e/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkIntegrationTest.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkIntegrationTest.java
index 50930a7..e981638 100644
--- 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkIntegrationTest.java
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkIntegrationTest.java
@@ -75,7 +75,7 @@ public class SparkIntegrationTest {
     interpreterSettingManager.setInterpreterBinding("user1", "note1", 
interpreterSettingManager.getInterpreterSettingIds());
     Interpreter sparkInterpreter = interpreterFactory.getInterpreter("user1", 
"note1", "spark.spark");
 
-    InterpreterContext context = new 
InterpreterContext.Builder().setNoteId("note1").setParagraphId("paragraph_1").getContext();
+    InterpreterContext context = new 
InterpreterContext.Builder().setNoteId("note1").setParagraphId("paragraph_1").build();
     InterpreterResult interpreterResult = 
sparkInterpreter.interpret("sc.version", context);
     assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code);
     String detectedSparkVersion = interpreterResult.message().get(0).getData();

Reply via email to