Repository: zeppelin Updated Branches: refs/heads/branch-0.8 257db1d5a -> e91837cd5
ZEPPELIN-3337. Add more test to SparkRInterpreter ### What is this PR for? Add more test for SparkRInterpreter, and also some code refactoring in SparkRInterpreter. Also fix one bug of SparkRInterpreter that it can be cancelled. ### What type of PR is it? [ Improvement | Refactoring] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-3337 ### How should this be tested? * CI pass ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #2871 from zjffdu/ZEPPELIN-3337 and squashes the following commits: 6cd91d5 [Jeff Zhang] ZEPPELIN-3337. Add more test to SparkRInterpreter (cherry picked from commit e30fe73e9a3bcd9cb14f02915883761894ceb2e4) Signed-off-by: Jeff Zhang <zjf...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/e91837cd Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/e91837cd Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/e91837cd Branch: refs/heads/branch-0.8 Commit: e91837cd5a22b47bcf5a75cda34067a1b1c2be6a Parents: 257db1d Author: Jeff Zhang <zjf...@apache.org> Authored: Thu Mar 15 12:38:01 2018 +0800 Committer: Jeff Zhang <zjf...@apache.org> Committed: Thu Mar 15 22:26:59 2018 +0800 ---------------------------------------------------------------------- .../zeppelin/spark/SparkRInterpreter.java | 44 +++++---------- .../zeppelin/spark/SparkRInterpreterTest.java | 59 +++++++++++++++----- testing/install_external_dependencies.sh | 1 + .../interpreter/InterpreterContext.java | 11 +++- .../interpreter/SparkIntegrationTest.java | 2 +- 5 files changed, 70 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e91837cd/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java index 44f71b7..896f3a1 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java @@ -46,8 +46,9 @@ import java.util.concurrent.atomic.AtomicInteger; public class SparkRInterpreter extends Interpreter { private static final Logger logger = LoggerFactory.getLogger(SparkRInterpreter.class); - private static String renderOptions; + private String renderOptions; private SparkInterpreter sparkInterpreter; + private boolean isSpark2; private ZeppelinR zeppelinR; private AtomicBoolean rbackendDead = new AtomicBoolean(false); private SparkContext sc; @@ -75,6 +76,7 @@ public class SparkRInterpreter extends Interpreter { sparkRLibPath = "sparkr"; } + // Share the same SparkRBackend across sessions synchronized (SparkRBackend.backend()) { if (!SparkRBackend.isStarted()) { SparkRBackend.init(); @@ -86,12 +88,13 @@ public class SparkRInterpreter extends Interpreter { this.sparkInterpreter = getSparkInterpreter(); this.sc = sparkInterpreter.getSparkContext(); this.jsc = sparkInterpreter.getJavaSparkContext(); + SparkVersion sparkVersion = new SparkVersion(sc.version()); + this.isSpark2 = sparkVersion.newerThanEquals(SparkVersion.SPARK_2_0_0); int timeout = this.sc.getConf().getInt("spark.r.backendConnectionTimeout", 6000); - SparkVersion sparkVersion = new SparkVersion(sc.version()); ZeppelinRContext.setSparkContext(sc); ZeppelinRContext.setJavaSparkContext(jsc); - if (Utils.isSpark2()) { + if (isSpark2) { ZeppelinRContext.setSparkSession(sparkInterpreter.getSparkSession()); } ZeppelinRContext.setSqlContext(sparkInterpreter.getSQLContext()); @@ -101,37 +104,28 @@ public class SparkRInterpreter extends Interpreter { try { zeppelinR.open(); } catch (IOException e) { - logger.error("Exception while opening SparkRInterpreter", e); - throw new InterpreterException(e); + throw new InterpreterException("Exception while opening SparkRInterpreter", e); } if (useKnitr()) { zeppelinR.eval("library('knitr')"); } - renderOptions = getProperty("zeppelin.R.render.options"); - } - - String getJobGroup(InterpreterContext context){ - return "zeppelin-" + context.getParagraphId(); + renderOptions = getProperty("zeppelin.R.render.options", + "out.format = 'html', comment = NA, echo = FALSE, results = 'asis', message = F, " + + "warning = F, fig.retina = 2"); } @Override public InterpreterResult interpret(String lines, InterpreterContext interpreterContext) throws InterpreterException { - SparkInterpreter sparkInterpreter = getSparkInterpreter(); sparkInterpreter.populateSparkWebUrl(interpreterContext); - if (sparkInterpreter.isUnsupportedSparkVersion()) { - return new InterpreterResult(InterpreterResult.Code.ERROR, "Spark " - + sparkInterpreter.getSparkVersion().toString() + " is not supported"); - } - String jobGroup = Utils.buildJobGroupId(interpreterContext); String jobDesc = "Started by: " + Utils.getUserName(interpreterContext.getAuthenticationInfo()); sparkInterpreter.getSparkContext().setJobGroup(jobGroup, jobDesc, false); - String imageWidth = getProperty("zeppelin.R.image.width"); + String imageWidth = getProperty("zeppelin.R.image.width", "100%"); String[] sl = lines.split("\n"); if (sl[0].contains("{") && sl[0].contains("}")) { @@ -152,14 +146,13 @@ public class SparkRInterpreter extends Interpreter { String setJobGroup = ""; // assign setJobGroup to dummy__, otherwise it would print NULL for this statement - if (Utils.isSpark2()) { + if (isSpark2) { setJobGroup = "dummy__ <- setJobGroup(\"" + jobGroup + "\", \" +" + jobDesc + "\", TRUE)"; } else if (getSparkInterpreter().getSparkVersion().newerThanEquals(SparkVersion.SPARK_1_5_0)) { setJobGroup = "dummy__ <- setJobGroup(sc, \"" + jobGroup + "\", \"" + jobDesc + "\", TRUE)"; } - logger.debug("set JobGroup:" + setJobGroup); lines = setJobGroup + "\n" + lines; try { @@ -190,11 +183,6 @@ public class SparkRInterpreter extends Interpreter { } catch (Exception e) { logger.error("Exception while connecting to R", e); return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage()); - } finally { - try { - } catch (Exception e) { - // Do nothing... - } } } @@ -206,7 +194,7 @@ public class SparkRInterpreter extends Interpreter { @Override public void cancel(InterpreterContext context) { if (this.sc != null) { - sc.cancelJobGroup(getJobGroup(context)); + sc.cancelJobGroup(Utils.buildJobGroupId(context)); } } @@ -256,11 +244,7 @@ public class SparkRInterpreter extends Interpreter { } private boolean useKnitr() { - try { - return Boolean.parseBoolean(getProperty("zeppelin.R.knitr")); - } catch (Exception e) { - return false; - } + return Boolean.parseBoolean(getProperty("zeppelin.R.knitr", "true")); } public AtomicBoolean getRbackendDead() { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e91837cd/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java index bcdd876..53f29c3 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java @@ -35,6 +35,7 @@ import java.util.Properties; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; @@ -84,6 +85,30 @@ public class SparkRInterpreterTest { assertTrue(result.message().get(0).getData().contains("eruptions waiting")); // spark job url is sent verify(mockRemoteEventClient, atLeastOnce()).onParaInfosReceived(any(String.class), any(String.class), any(Map.class)); + + // cancel + final InterpreterContext context = getInterpreterContext(); + Thread thread = new Thread() { + @Override + public void run() { + try { + InterpreterResult result = sparkRInterpreter.interpret("ldf <- dapplyCollect(\n" + + " df,\n" + + " function(x) {\n" + + " Sys.sleep(3)\n" + + " x <- cbind(x, \"waiting_secs\" = x$waiting * 60)\n" + + " })\n" + + "head(ldf, 3)", context); + assertTrue(result.message().get(0).getData().contains("cancelled")); + } catch (InterpreterException e) { + fail("Should not throw InterpreterException"); + } + } + }; + thread.setName("Cancel-Thread"); + thread.start(); + Thread.sleep(1000); + sparkRInterpreter.cancel(context); } else { // spark 1.x result = sparkRInterpreter.interpret("df <- createDataFrame(sqlContext, faithful)\nhead(df)", getInterpreterContext()); @@ -93,6 +118,20 @@ public class SparkRInterpreterTest { verify(mockRemoteEventClient, atLeastOnce()).onParaInfosReceived(any(String.class), any(String.class), any(Map.class)); } + // plotting + result = sparkRInterpreter.interpret("hist(mtcars$mpg)", getInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(1, result.message().size()); + assertEquals(InterpreterResult.Type.HTML, result.message().get(0).getType()); + assertTrue(result.message().get(0).getData().contains("<img src=")); + + result = sparkRInterpreter.interpret("library(ggplot2)\n" + + "ggplot(diamonds, aes(x=carat, y=price, color=cut)) + geom_point()", getInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(1, result.message().size()); + assertEquals(InterpreterResult.Type.HTML, result.message().get(0).getType()); + assertTrue(result.message().get(0).getData().contains("<img src=")); + // sparkr backend would be timeout after 10 seconds Thread.sleep(15 * 1000); result = sparkRInterpreter.interpret("1+1", getInterpreterContext()); @@ -101,21 +140,11 @@ public class SparkRInterpreterTest { } private InterpreterContext getInterpreterContext() { - InterpreterContext context = new InterpreterContext( - "noteId", - "paragraphId", - "replName", - "paragraphTitle", - "paragraphText", - new AuthenticationInfo(), - new HashMap<String, Object>(), - new GUI(), - new GUI(), - new AngularObjectRegistry("spark", null), - null, - null, - null); - context.setClient(mockRemoteEventClient); + InterpreterContext context = InterpreterContext.builder() + .setNoteId("note_1") + .setParagraphId("paragraph_1") + .setEventClient(mockRemoteEventClient) + .build(); return context; } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e91837cd/testing/install_external_dependencies.sh ---------------------------------------------------------------------- diff --git a/testing/install_external_dependencies.sh b/testing/install_external_dependencies.sh index a120d61..d0b0f63 100755 --- a/testing/install_external_dependencies.sh +++ b/testing/install_external_dependencies.sh @@ -30,6 +30,7 @@ if [[ "${SPARKR}" = "true" ]] ; then R -e "install.packages('evaluate', repos = 'http://cran.us.r-project.org', lib='~/R')" > /dev/null 2>&1 R -e "install.packages('base64enc', repos = 'http://cran.us.r-project.org', lib='~/R')" > /dev/null 2>&1 R -e "install.packages('knitr', repos = 'http://cran.us.r-project.org', lib='~/R')" > /dev/null 2>&1 + R -e "install.packages('ggplot2', repos = 'http://cran.us.r-project.org', lib='~/R')" > /dev/null 2>&1 fi fi http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e91837cd/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java index 8fa0904..dec5adb 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java @@ -83,11 +83,20 @@ public class InterpreterContext { return this; } - public InterpreterContext getContext() { + public Builder setEventClient(RemoteEventClientWrapper client) { + context.client = client; + return this; + } + + public InterpreterContext build() { return context; } } + public static Builder builder() { + return new Builder(); + } + private InterpreterContext() { } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e91837cd/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkIntegrationTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkIntegrationTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkIntegrationTest.java index 50930a7..e981638 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkIntegrationTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkIntegrationTest.java @@ -75,7 +75,7 @@ public class SparkIntegrationTest { interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getInterpreterSettingIds()); Interpreter sparkInterpreter = interpreterFactory.getInterpreter("user1", "note1", "spark.spark"); - InterpreterContext context = new InterpreterContext.Builder().setNoteId("note1").setParagraphId("paragraph_1").getContext(); + InterpreterContext context = new InterpreterContext.Builder().setNoteId("note1").setParagraphId("paragraph_1").build(); InterpreterResult interpreterResult = sparkInterpreter.interpret("sc.version", context); assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code); String detectedSparkVersion = interpreterResult.message().get(0).getData();