This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 9320d14 [ZEPPELIN-4474]. Move r interpreter of spark into a separated interpreter 9320d14 is described below commit 9320d1493fb13e656cc57a6bab8542a0027e1814 Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Mon Dec 9 13:57:23 2019 +0800 [ZEPPELIN-4474]. Move r interpreter of spark into a separated interpreter ### What is this PR for? This PR move the r interpreter of spark into a separated interpreter `r`, that means user can use r in its native form rather than always need to in spark interpreter. ### What type of PR is it? [ Feature | Refactoring] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-4474 ### How should this be tested? * Ci pass ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3552 from zjffdu/ZEPPELIN-4474 and squashes the following commits: 6574f86cb [Jeff Zhang] [ZEPPELIN-4474]. Move r interpreter of spark into a separated interpreter --- pom.xml | 1 + rlang/pom.xml | 188 ++++++++++++++++++ .../java/org/apache/zeppelin/r/RInterpreter.java | 205 ++++++++++++++++++++ .../org/apache/zeppelin/r/RZeppelinContext.java | 49 +++++ .../java/org/apache/zeppelin/r/SparkRBackend.java | 90 +++++++++ .../java/org/apache/zeppelin/r}/ZeppelinR.java | 215 ++++++++++----------- .../org/apache/zeppelin/r/ZeppelinRDisplay.java | 147 ++++++++++++++ .../src/main/resources/R/zeppelin_sparkr.R | 35 ++-- rlang/src/main/resources/interpreter-setting.json | 37 ++++ .../org/apache/zeppelin/r/RInterpreterTest.java | 131 +++++++++++++ rlang/src/test/resources/log4j.properties | 26 +++ spark/interpreter/pom.xml | 12 ++ .../apache/zeppelin/spark/SparkRInterpreter.java | 127 +++--------- .../scala/org/apache/spark/SparkRBackend.scala | 61 ------ .../apache/zeppelin/spark/ZeppelinRDisplay.scala | 117 ----------- .../apache/zeppelin/spark/utils/DisplayUtils.scala | 90 --------- .../zeppelin/spark/SparkRInterpreterTest.java | 2 - .../spark/utils/DisplayFunctionsTest.scala | 173 ----------------- zeppelin-interpreter-integration/pom.xml | 4 + 19 files changed, 1039 insertions(+), 671 deletions(-) diff --git a/pom.xml b/pom.xml index cbd9b14..bac85ab 100644 --- a/pom.xml +++ b/pom.xml @@ -57,6 +57,7 @@ <module>zeppelin-interpreter-api</module> <module>zeppelin-zengine</module> <module>zeppelin-display</module> + <module>rlang</module> <module>kotlin</module> <module>groovy</module> <module>spark</module> diff --git a/rlang/pom.xml b/rlang/pom.xml new file mode 100644 index 0000000..5f68e19 --- /dev/null +++ b/rlang/pom.xml @@ -0,0 +1,188 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>zeppelin-interpreter-parent</artifactId> + <groupId>org.apache.zeppelin</groupId> + <version>0.9.0-SNAPSHOT</version> + <relativePath>../zeppelin-interpreter-parent/pom.xml</relativePath> + </parent> + + <groupId>org.apache.zeppelin</groupId> + <artifactId>r</artifactId> + <packaging>jar</packaging> + <version>0.9.0-SNAPSHOT</version> + <name>Zeppelin: R</name> + <description>Zeppelin R support</description> + + <properties> + <interpreter.name>r</interpreter.name> + <jsoup.version>1.12.1</jsoup.version> + <spark.version>2.4.3</spark.version> + + <spark.archive>spark-${spark.version}</spark.archive> + <spark.src.download.url> + https://archive.apache.org/dist/spark/${spark.archive}/${spark.archive}.tgz + </spark.src.download.url> + <spark.bin.download.url> + https://archive.apache.org/dist/spark/${spark.archive}/${spark.archive}-bin-without-hadoop.tgz + </spark.bin.download.url> + <interpreter.jar.name>zeppelin-interpreter-r</interpreter.jar.name> + </properties> + + <dependencies> + + <dependency> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <version>4.5.1</version> + </dependency> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_2.11</artifactId> + <version>${spark.version}</version> + </dependency> + + <dependency> + <groupId>org.jsoup</groupId> + <artifactId>jsoup</artifactId> + <version>${jsoup.version}</version> + </dependency> + + </dependencies> + + <build> + <plugins> + <plugin> + <artifactId>maven-enforcer-plugin</artifactId> + </plugin> + <plugin> + <artifactId>maven-dependency-plugin</artifactId> + </plugin> + + <!-- include sparkr by default --> + <plugin> + <groupId>com.googlecode.maven-download-plugin</groupId> + <artifactId>download-maven-plugin</artifactId> + <executions> + <execution> + <id>download-sparkr-files</id> + <phase>validate</phase> + <goals> + <goal>wget</goal> + </goals> + <configuration> + <readTimeOut>60000</readTimeOut> + <retries>5</retries> + <url>${spark.bin.download.url}</url> + <unpack>true</unpack> + <outputDirectory>${project.build.directory}</outputDirectory> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> + <artifactId>maven-resources-plugin</artifactId> + <version>2.7</version> + <executions> + <execution> + <id>copy-sparkr-files</id> + <phase>generate-resources</phase> + <goals> + <goal>copy-resources</goal> + </goals> + <configuration> + <outputDirectory>${project.build.directory}/../../interpreter/r/R/lib</outputDirectory> + <resources> + <resource> + <directory> + ${project.build.directory}/spark-${spark.version}-bin-without-hadoop/R/lib + </directory> + </resource> + </resources> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>${plugin.shade.version}</version> + <configuration> + <filters> + <filter> + <artifact>*:*</artifact> + </filter> + </filters> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> + <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> + <resource>reference.conf</resource> + </transformer> + </transformers> + <artifactSet> + <excludes> + <exclude>org.apache.zeppelin:zeppelin-interpreter-api</exclude> + </excludes> + </artifactSet> + <outputFile>${project.build.directory}/../../interpreter/rlang/${interpreter.jar.name}-${project.version}.jar</outputFile> + </configuration> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <forkCount>1</forkCount> + <reuseForks>false</reuseForks> + <argLine>-Xmx2048m -XX:MaxPermSize=256m</argLine> + <environmentVariables> + <ZEPPELIN_HOME>${basedir}/../</ZEPPELIN_HOME> + </environmentVariables> + </configuration> + </plugin> + + </plugins> + </build> +</project> diff --git a/rlang/src/main/java/org/apache/zeppelin/r/RInterpreter.java b/rlang/src/main/java/org/apache/zeppelin/r/RInterpreter.java new file mode 100644 index 0000000..e3810ef --- /dev/null +++ b/rlang/src/main/java/org/apache/zeppelin/r/RInterpreter.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.r; + +import org.apache.zeppelin.interpreter.AbstractInterpreter; +import org.apache.zeppelin.interpreter.BaseZeppelinContext; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; +import org.apache.zeppelin.scheduler.Scheduler; +import org.apache.zeppelin.scheduler.SchedulerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; + + +/** + * R interpreter with visualization support. + */ +public class RInterpreter extends AbstractInterpreter { + private static final Logger LOGGER = LoggerFactory.getLogger(RInterpreter.class); + private static RZeppelinContext z; + + private SparkRBackend sparkRBackend; + private ZeppelinR zeppelinR; + private String renderOptions; + private boolean useKnitr; + private AtomicBoolean rbackendDead = new AtomicBoolean(false); + + public RInterpreter(Properties property) { + super(property); + } + + /** + * RInterpreter just use spark-core for the communication between R process and jvm process. + * SparkContext is not created in this RInterpreter. + * Sub class can override this, e.g. SparkRInterpreter + * @return + */ + protected boolean isSparkSupported() { + return false; + } + + /** + * The spark version specified in pom.xml + * Sub class can override this, e.g. SparkRInterpreter + * @return + */ + protected int sparkVersion() { + return 20403; + } + + /** + * Spark 2.4.3 need secret for socket communication between R process and jvm process. + * Sub class can override this, e.g. SparkRInterpreter + * @return + */ + protected boolean isSecretSupported() { + return true; + } + + @Override + public void open() throws InterpreterException { + this.sparkRBackend = SparkRBackend.get(); + // Share the same SparkRBackend across sessions + synchronized (sparkRBackend) { + if (!sparkRBackend.isStarted()) { + try { + sparkRBackend.init(isSecretSupported()); + } catch (Exception e) { + throw new InterpreterException("Fail to init SparkRBackend", e); + } + sparkRBackend.start(); + } + } + + synchronized (RInterpreter.class) { + if (this.z == null) { + z = new RZeppelinContext(getInterpreterGroup().getInterpreterHookRegistry(), + Integer.parseInt(getProperty("zeppelin.r.maxResult", "1000"))); + } + } + this.renderOptions = getProperty("zeppelin.R.render.options", + "out.format = 'html', comment = NA, echo = FALSE, results = 'asis', message = F, " + + "warning = F, fig.retina = 2"); + this.useKnitr = Boolean.parseBoolean(getProperty("zeppelin.R.knitr", "true")); + zeppelinR = new ZeppelinR(this); + try { + zeppelinR.open(); + LOGGER.info("ZeppelinR is opened successfully."); + } catch (IOException e) { + throw new InterpreterException("Exception while opening SparkRInterpreter", e); + } + + if (useKnitr) { + zeppelinR.eval("library('knitr')"); + } + } + + @Override + public InterpreterResult internalInterpret(String lines, InterpreterContext interpreterContext) + throws InterpreterException { + + String imageWidth = getProperty("zeppelin.R.image.width", "100%"); + // paragraph local propery 'imageWidth' can override this + if (interpreterContext.getLocalProperties().containsKey("imageWidth")) { + imageWidth = interpreterContext.getLocalProperties().get("imageWidth"); + } + try { + // render output with knitr + if (rbackendDead.get()) { + return new InterpreterResult(InterpreterResult.Code.ERROR, + "sparkR backend is dead, please try to increase spark.r.backendConnectionTimeout"); + } + if (useKnitr) { + zeppelinR.setInterpreterOutput(null); + zeppelinR.set(".zcmd", "\n```{r " + renderOptions + "}\n" + lines + "\n```"); + zeppelinR.eval(".zres <- knit2html(text=.zcmd)"); + String html = zeppelinR.getS0(".zres"); + RDisplay rDisplay = ZeppelinRDisplay.render(html, imageWidth); + return new InterpreterResult( + rDisplay.getCode(), + rDisplay.getTyp(), + rDisplay.getContent() + ); + } else { + // alternatively, stream the output (without knitr) + zeppelinR.setInterpreterOutput(interpreterContext.out); + zeppelinR.eval(lines); + return new InterpreterResult(InterpreterResult.Code.SUCCESS, ""); + } + } catch (Exception e) { + logger.error("Exception while connecting to R", e); + return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage()); + } + } + + @Override + public void close() throws InterpreterException { + if (this.zeppelinR != null) { + zeppelinR.close(); + } + } + + @Override + public void cancel(InterpreterContext context) throws InterpreterException { + + } + + @Override + public FormType getFormType() { + return FormType.NATIVE; + } + + @Override + public int getProgress(InterpreterContext context) throws InterpreterException { + return 0; + } + + @Override + public Scheduler getScheduler() { + return SchedulerFactory.singleton().createOrGetFIFOScheduler( + RInterpreter.class.getName() + this.hashCode()); + } + + @Override + public BaseZeppelinContext getZeppelinContext() { + return this.z; + } + + @Override + public List<InterpreterCompletion> completion(String buf, int cursor, + InterpreterContext interpreterContext) { + return new ArrayList<>(); + } + + public AtomicBoolean getRbackendDead() { + return rbackendDead; + } + + public static RZeppelinContext getRZeppelinContext() { + return z; + } +} diff --git a/rlang/src/main/java/org/apache/zeppelin/r/RZeppelinContext.java b/rlang/src/main/java/org/apache/zeppelin/r/RZeppelinContext.java new file mode 100644 index 0000000..0f753e6 --- /dev/null +++ b/rlang/src/main/java/org/apache/zeppelin/r/RZeppelinContext.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.r; + +import org.apache.zeppelin.interpreter.BaseZeppelinContext; +import org.apache.zeppelin.interpreter.InterpreterHookRegistry; + +import java.util.List; +import java.util.Map; + +/** + * ZeppelinContext for R, only contains the basic function of ZeppelinContext. + */ +public class RZeppelinContext extends BaseZeppelinContext { + + public RZeppelinContext(InterpreterHookRegistry hooks, int maxResult) { + super(hooks, maxResult); + } + + @Override + public Map<String, String> getInterpreterClassMap() { + return null; + } + + @Override + public List<Class> getSupportedClasses() { + return null; + } + + @Override + public String showData(Object obj, int maxResult) { + return null; + } +} diff --git a/rlang/src/main/java/org/apache/zeppelin/r/SparkRBackend.java b/rlang/src/main/java/org/apache/zeppelin/r/SparkRBackend.java new file mode 100644 index 0000000..1b07668 --- /dev/null +++ b/rlang/src/main/java/org/apache/zeppelin/r/SparkRBackend.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.r; + +import org.apache.spark.api.r.RBackend; +import scala.Tuple2; + + +/** + * SparkRBackend is responsible for communication between r process and jvm process. + * It uses Spark's RBackend to start a SocketServer in JVM side to listen request from R process. + */ +public class SparkRBackend { + private static SparkRBackend singleton; + + private RBackend backend = new RBackend(); + private boolean started = false; + private int portNumber = 0; + private String secret = ""; + private Thread backendThread; + + public synchronized static SparkRBackend get() { + if (singleton == null) { + singleton = new SparkRBackend(); + } + return singleton; + } + + private SparkRBackend() { + this.backendThread = new Thread("SparkRBackend") { + @Override + public void run() { + backend.run(); + } + }; + } + + public void init(boolean isSecretSocketSupported) throws Exception { + Class rBackendClass = RBackend.class; + if (isSecretSocketSupported) { + Tuple2<Integer, Object> result = + (Tuple2<Integer, Object>) rBackendClass.getMethod("init").invoke(backend); + portNumber = result._1; + Object rAuthHelper = result._2; + secret = (String) rAuthHelper.getClass().getMethod("secret").invoke(rAuthHelper); + } else { + portNumber = (Integer) rBackendClass.getMethod("init").invoke(backend); + } + } + + public void start() { + backendThread.start(); + started = true; + } + + public void close(){ + backend.close(); + try { + backendThread.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + public boolean isStarted() { + return started; + } + + public int port(){ + return portNumber; + } + + public String socketSecret() { + return secret; + } +} diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java b/rlang/src/main/java/org/apache/zeppelin/r/ZeppelinR.java similarity index 76% rename from spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java rename to rlang/src/main/java/org/apache/zeppelin/r/ZeppelinR.java index 00226e9..1ca389b 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java +++ b/rlang/src/main/java/org/apache/zeppelin/r/ZeppelinR.java @@ -14,19 +14,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.zeppelin.spark; +package org.apache.zeppelin.r; -import org.apache.commons.exec.*; +import org.apache.commons.exec.CommandLine; import org.apache.commons.exec.environment.EnvironmentUtils; import org.apache.commons.io.IOUtils; -import org.apache.spark.SparkRBackend; +import org.apache.zeppelin.r.SparkRBackend; import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterOutput; import org.apache.zeppelin.interpreter.util.ProcessLauncher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.*; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -37,85 +40,26 @@ import java.util.Map; public class ZeppelinR { private static Logger LOGGER = LoggerFactory.getLogger(ZeppelinR.class); - private final SparkRInterpreter sparkRInterpreter; - private final String rCmdPath; - private final SparkVersion sparkVersion; - private final int timeout; + private RInterpreter rInterpreter; private RProcessLogOutputStream processOutputStream; - private final String scriptPath; - private final String libPath; static Map<Integer, ZeppelinR> zeppelinR = Collections.synchronizedMap(new HashMap()); - private final int port; private RProcessLauncher rProcessLauncher; /** * Request to R repl */ - Request rRequestObject = null; - Integer rRequestNotifier = new Integer(0); - - public void setInterpreterOutput(InterpreterOutput out) { - processOutputStream.setInterpreterOutput(out); - } - - /** - * Request object - * - * type : "eval", "set", "get" - * stmt : statement to evaluate when type is "eval" - * key when type is "set" or "get" - * value : value object when type is "put" - */ - public static class Request { - String type; - String stmt; - Object value; - - public Request(String type, String stmt, Object value) { - this.type = type; - this.stmt = stmt; - this.value = value; - } - - public String getType() { - return type; - } - - public String getStmt() { - return stmt; - } - - public Object getValue() { - return value; - } - } + private Request rRequestObject = null; + private Integer rRequestNotifier = new Integer(0); /** * Response from R repl */ - Object rResponseValue = null; - boolean rResponseError = false; - Integer rResponseNotifier = new Integer(0); + private Object rResponseValue = null; + private boolean rResponseError = false; + private Integer rResponseNotifier = new Integer(0); - /** - * Create ZeppelinR instance - * @param rCmdPath R repl commandline path - * @param libPath sparkr library path - */ - public ZeppelinR(String rCmdPath, String libPath, int sparkRBackendPort, - SparkVersion sparkVersion, int timeout, SparkRInterpreter sparkRInterpreter) { - this.rCmdPath = rCmdPath; - this.libPath = libPath; - this.sparkVersion = sparkVersion; - this.port = sparkRBackendPort; - this.timeout = timeout; - this.sparkRInterpreter = sparkRInterpreter; - try { - File scriptFile = File.createTempFile("zeppelin_sparkr-", ".R"); - scriptPath = scriptFile.getAbsolutePath(); - } catch (IOException e) { - throw new RuntimeException(e); - } + public ZeppelinR(RInterpreter rInterpreter) { + this.rInterpreter = rInterpreter; } /** @@ -123,27 +67,69 @@ public class ZeppelinR { * @throws IOException */ public void open() throws IOException, InterpreterException { - createRScript(); + + String rCmdPath = rInterpreter.getProperty("zeppelin.R.cmd", "R"); + String sparkRLibPath; + + if (System.getenv("SPARK_HOME") != null) { + // local or yarn-client mode when SPARK_HOME is specified + sparkRLibPath = System.getenv("SPARK_HOME") + "/R/lib"; + } else if (System.getenv("ZEPPELIN_HOME") != null){ + // embedded mode when SPARK_HOME is not specified or for native R support + String interpreter = "r"; + if (rInterpreter.isSparkSupported()) { + interpreter = "spark"; + } + sparkRLibPath = System.getenv("ZEPPELIN_HOME") + "/interpreter/" + interpreter + "/R/lib"; + // workaround to make sparkr work without SPARK_HOME + System.setProperty("spark.test.home", System.getenv("ZEPPELIN_HOME") + "/interpreter/" + interpreter); + } else { + // yarn-cluster mode + sparkRLibPath = "sparkr"; + } + if (!new File(sparkRLibPath).exists()) { + throw new InterpreterException(String.format("sparkRLib %s doesn't exist", sparkRLibPath)); + } + + File scriptFile = File.createTempFile("zeppelin_sparkr-", ".R"); + FileOutputStream out = null; + InputStream in = null; + try { + out = new FileOutputStream(scriptFile); + in = getClass().getClassLoader().getResourceAsStream("R/zeppelin_sparkr.R"); + IOUtils.copy(in, out); + } catch (IOException e) { + throw new InterpreterException(e); + } finally { + if (out != null) { + out.close(); + } + if (in != null) { + in.close(); + } + } zeppelinR.put(hashCode(), this); + String timeout = rInterpreter.getProperty("spark.r.backendConnectionTimeout", "6000"); CommandLine cmd = CommandLine.parse(rCmdPath); cmd.addArgument("--no-save"); cmd.addArgument("--no-restore"); cmd.addArgument("-f"); - cmd.addArgument(scriptPath); + cmd.addArgument(scriptFile.getAbsolutePath()); cmd.addArgument("--args"); cmd.addArgument(Integer.toString(hashCode())); - cmd.addArgument(Integer.toString(port)); - cmd.addArgument(libPath); - cmd.addArgument(Integer.toString(sparkVersion.toNumber())); - cmd.addArgument(Integer.toString(timeout)); - if (sparkVersion.isSecretSocketSupported()) { - cmd.addArgument(SparkRBackend.socketSecret()); + cmd.addArgument(Integer.toString(SparkRBackend.get().port())); + cmd.addArgument(sparkRLibPath); + cmd.addArgument(rInterpreter.sparkVersion() + ""); + cmd.addArgument(timeout); + cmd.addArgument(rInterpreter.isSparkSupported() + ""); + if (rInterpreter.isSecretSupported()) { + cmd.addArgument(SparkRBackend.get().socketSecret()); } // dump out the R command to facilitate manually running it, e.g. for fault diagnosis purposes LOGGER.info("R Command: " + cmd.toString()); - processOutputStream = new RProcessLogOutputStream(sparkRInterpreter); + processOutputStream = new RProcessLogOutputStream(rInterpreter); Map env = EnvironmentUtils.getProcEnvironment(); rProcessLauncher = new RProcessLauncher(cmd, env, processOutputStream); rProcessLauncher.launch(); @@ -162,6 +148,42 @@ public class ZeppelinR { eval("cat('')"); } + public void setInterpreterOutput(InterpreterOutput out) { + processOutputStream.setInterpreterOutput(out); + } + + /** + * Request object + * + * type : "eval", "set", "get" + * stmt : statement to evaluate when type is "eval" + * key when type is "set" or "get" + * value : value object when type is "put" + */ + public static class Request { + String type; + String stmt; + Object value; + + public Request(String type, String stmt, Object value) { + this.type = type; + this.stmt = stmt; + this.value = value; + } + + public String getType() { + return type; + } + + public String getStmt() { + return stmt; + } + + public Object getValue() { + return value; + } + } + /** * Evaluate expression * @param expr @@ -289,37 +311,12 @@ public class ZeppelinR { } /** - * Create R script in tmp dir - */ - private void createRScript() throws InterpreterException { - ClassLoader classLoader = getClass().getClassLoader(); - File out = new File(scriptPath); - - if (out.exists() && out.isDirectory()) { - throw new InterpreterException("Can't create r script " + out.getAbsolutePath()); - } - - try { - FileOutputStream outStream = new FileOutputStream(out); - IOUtils.copy( - classLoader.getResourceAsStream("R/zeppelin_sparkr.R"), - outStream); - outStream.close(); - } catch (IOException e) { - throw new InterpreterException(e); - } - - LOGGER.info("File {} created", scriptPath); - } - - /** * Terminate this R repl */ public void close() { if (rProcessLauncher != null) { rProcessLauncher.stop(); } - new File(scriptPath).delete(); zeppelinR.remove(hashCode()); } @@ -371,10 +368,10 @@ public class ZeppelinR { public static class RProcessLogOutputStream extends ProcessLauncher.ProcessLogOutputStream { private InterpreterOutput interpreterOutput; - private SparkRInterpreter sparkRInterpreter; + private RInterpreter rInterpreter; - public RProcessLogOutputStream(SparkRInterpreter sparkRInterpreter) { - this.sparkRInterpreter = sparkRInterpreter; + public RProcessLogOutputStream(RInterpreter rInterpreter) { + this.rInterpreter = rInterpreter; } /** @@ -390,7 +387,7 @@ public class ZeppelinR { super.processLine(s, i); if (s.contains("Java SparkR backend might have failed") // spark 2.x || s.contains("Execution halted")) { // spark 1.x - sparkRInterpreter.getRbackendDead().set(true); + rInterpreter.getRbackendDead().set(true); } if (interpreterOutput != null) { try { diff --git a/rlang/src/main/java/org/apache/zeppelin/r/ZeppelinRDisplay.java b/rlang/src/main/java/org/apache/zeppelin/r/ZeppelinRDisplay.java new file mode 100644 index 0000000..8487631 --- /dev/null +++ b/rlang/src/main/java/org/apache/zeppelin/r/ZeppelinRDisplay.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.r; + +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.apache.zeppelin.interpreter.InterpreterResult.Type; +import org.jsoup.Jsoup; +import org.jsoup.nodes.Document; +import org.jsoup.nodes.Element; +import org.jsoup.nodes.Document.OutputSettings; +import org.jsoup.safety.Whitelist; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + + +class RDisplay { + private String content; + private Type typ; + private Code code; + + public RDisplay(String content, Type typ, Code code) { + this.content = content; + this.typ = typ; + this.code = code; + } + + public String getContent() { + return content; + } + + public Type getTyp() { + return typ; + } + + public Code getCode() { + return code; + } +} + +public class ZeppelinRDisplay { + + private static Pattern pattern = Pattern.compile("^ *\\[\\d*\\]"); + + public static RDisplay render( String html, String imageWidth) { + + Document document = Jsoup.parse(html); + document.outputSettings().prettyPrint(false); + + Element body = document.body(); + + if (body.getElementsByTag("p").isEmpty()) { + return new RDisplay(body.html(), Type.HTML, Code.SUCCESS); + } + + String bodyHtml = body.html(); + + if (! bodyHtml.contains("<img") + && ! bodyHtml.contains("<script") + && ! bodyHtml.contains("%html ") + && ! bodyHtml.contains("%table ") + && ! bodyHtml.contains("%img ") + ) { + return textDisplay(body); + } + + if (bodyHtml.contains("%table")) { + return tableDisplay(body); + } + + if (bodyHtml.contains("%img")) { + return imgDisplay(body); + } + + return htmlDisplay(body, imageWidth); + } + + private static RDisplay textDisplay(Element body) { + // remove HTML tag while preserving whitespaces and newlines + String text = Jsoup.clean(body.html(), "", + Whitelist.none(), new OutputSettings().prettyPrint(false)); + return new RDisplay(text, Type.TEXT, Code.SUCCESS); + } + + private static RDisplay tableDisplay(Element body) { + String p = body.getElementsByTag("p").first().html().replace("“%table " , "").replace("”", ""); + Matcher matcher = pattern.matcher(p); + if (matcher.matches()) { + p = p.replace(matcher.group(), ""); + } + String table = p.replace("\\t", "\t").replace("\\n", "\n"); + return new RDisplay(table, Type.TABLE, Code.SUCCESS); + } + + private static RDisplay imgDisplay(Element body) { + String p = body.getElementsByTag("p").first().html().replace("“%img " , "").replace("”", ""); + Matcher matcher = pattern.matcher(p); + if (matcher.matches()) { + p = p.replace(matcher.group(), ""); + } + return new RDisplay(p, Type.IMG, Code.SUCCESS); + } + + private static RDisplay htmlDisplay(Element body, String imageWidth) { + String div = ""; + for (Element element : body.children()) { + String eHtml = element.html(); + String eOuterHtml = element.outerHtml(); + + eOuterHtml = eOuterHtml.replace("“%html " , "").replace("”", ""); + + Matcher matcher = pattern.matcher(eHtml); + if (matcher.matches()) { + eOuterHtml = eOuterHtml.replace(matcher.group(), ""); + } + + div = div + eOuterHtml; + } + + String content = div + .replaceAll("src=\"//", "src=\"http://") + .replaceAll("href=\"//", "href=\"http://"); + + body.html(content); + + for (Element image : body.getElementsByTag("img")) { + image.attr("width", imageWidth); + } + + return new RDisplay(body.html(), Type.HTML, Code.SUCCESS); + } +} diff --git a/spark/interpreter/src/main/resources/R/zeppelin_sparkr.R b/rlang/src/main/resources/R/zeppelin_sparkr.R similarity index 77% rename from spark/interpreter/src/main/resources/R/zeppelin_sparkr.R rename to rlang/src/main/resources/R/zeppelin_sparkr.R index 1596aac..94fa6a7 100644 --- a/spark/interpreter/src/main/resources/R/zeppelin_sparkr.R +++ b/rlang/src/main/resources/R/zeppelin_sparkr.R @@ -23,9 +23,10 @@ port <- as.integer(args[2]) libPath <- args[3] version <- as.integer(args[4]) timeout <- as.integer(args[5]) +isSparkSupported <- args[6] authSecret <- NULL -if (length(args) >= 6) { - authSecret <- args[6] +if (length(args) >= 7) { + authSecret <- args[7] } rm(args) @@ -46,19 +47,23 @@ if (is.null(authSecret)) { assign(".scStartTime", as.integer(Sys.time()), envir = SparkR:::.sparkREnv) # getZeppelinR -.zeppelinR = SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinR", "getZeppelinR", hashCode) - -# setup spark env -assign(".sc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSparkContext"), envir = SparkR:::.sparkREnv) -assign("sc", get(".sc", envir = SparkR:::.sparkREnv), envir=.GlobalEnv) -if (version >= 20000) { - assign(".sparkRsession", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSparkSession"), envir = SparkR:::.sparkREnv) - assign("spark", get(".sparkRsession", envir = SparkR:::.sparkREnv), envir = .GlobalEnv) - assign(".sparkRjsc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getJavaSparkContext"), envir = SparkR:::.sparkREnv) -} -assign(".sqlc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSqlContext"), envir = SparkR:::.sparkREnv) -assign("sqlContext", get(".sqlc", envir = SparkR:::.sparkREnv), envir = .GlobalEnv) -assign(".zeppelinContext", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getZeppelinContext"), envir = .GlobalEnv) +.zeppelinR = SparkR:::callJStatic("org.apache.zeppelin.r.ZeppelinR", "getZeppelinR", hashCode) + +if (isSparkSupported == "true") { + # setup spark env + assign(".sc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSparkContext"), envir = SparkR:::.sparkREnv) + assign("sc", get(".sc", envir = SparkR:::.sparkREnv), envir=.GlobalEnv) + if (version >= 20000) { + assign(".sparkRsession", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSparkSession"), envir = SparkR:::.sparkREnv) + assign("spark", get(".sparkRsession", envir = SparkR:::.sparkREnv), envir = .GlobalEnv) + assign(".sparkRjsc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getJavaSparkContext"), envir = SparkR:::.sparkREnv) + } + assign(".sqlc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSqlContext"), envir = SparkR:::.sparkREnv) + assign("sqlContext", get(".sqlc", envir = SparkR:::.sparkREnv), envir = .GlobalEnv) + assign(".zeppelinContext", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getZeppelinContext"), envir = .GlobalEnv) +} else { + assign(".zeppelinContext", SparkR:::callJStatic("org.apache.zeppelin.r.RInterpreter", "getRZeppelinContext"), envir = .GlobalEnv) +} z.put <- function(name, object) { SparkR:::callJMethod(.zeppelinContext, "put", name, object) diff --git a/rlang/src/main/resources/interpreter-setting.json b/rlang/src/main/resources/interpreter-setting.json new file mode 100644 index 0000000..aab4306 --- /dev/null +++ b/rlang/src/main/resources/interpreter-setting.json @@ -0,0 +1,37 @@ +[ + { + "group": "r", + "name": "r", + "className": "org.apache.zeppelin.r.RInterpreter", + "properties": { + "zeppelin.R.knitr": { + "envName": "ZEPPELIN_R_KNITR", + "propertyName": "zeppelin.R.knitr", + "defaultValue": true, + "description": "whether use knitr or not", + "type": "checkbox" + }, + "zeppelin.R.cmd": { + "envName": "ZEPPELIN_R_CMD", + "propertyName": "zeppelin.R.cmd", + "defaultValue": "R", + "description": "R repl path", + "type": "string" + }, + "zeppelin.R.image.width": { + "envName": "ZEPPELIN_R_IMAGE_WIDTH", + "propertyName": "zeppelin.R.image.width", + "defaultValue": "100%", + "description": "", + "type": "number" + }, + "zeppelin.R.render.options": { + "envName": "ZEPPELIN_R_RENDER_OPTIONS", + "propertyName": "zeppelin.R.render.options", + "defaultValue": "out.format = 'html', comment = NA, echo = FALSE, results = 'asis', message = F, warning = F, fig.retina = 2", + "description": "", + "type": "textarea" + } + } + } +] diff --git a/rlang/src/test/java/org/apache/zeppelin/r/RInterpreterTest.java b/rlang/src/test/java/org/apache/zeppelin/r/RInterpreterTest.java new file mode 100644 index 0000000..97b9844 --- /dev/null +++ b/rlang/src/test/java/org/apache/zeppelin/r/RInterpreterTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.zeppelin.r; + +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.InterpreterGroup; +import org.apache.zeppelin.interpreter.InterpreterOutput; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.LazyOpenInterpreter; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class RInterpreterTest { + + private RInterpreter rInterpreter; + + @Before + public void setUp() throws InterpreterException { + Properties properties = new Properties(); + properties.setProperty("zeppelin.R.knitr", "true"); + properties.setProperty("spark.r.backendConnectionTimeout", "10"); + + InterpreterContext context = getInterpreterContext(); + InterpreterContext.set(context); + rInterpreter = new RInterpreter(properties); + + InterpreterGroup interpreterGroup = new InterpreterGroup(); + interpreterGroup.addInterpreterToSession(new LazyOpenInterpreter(rInterpreter), "session_1"); + rInterpreter.setInterpreterGroup(interpreterGroup); + + rInterpreter.open(); + } + + @After + public void tearDown() throws InterpreterException { + rInterpreter.close(); + } + + @Test + public void testSparkRInterpreter() throws InterpreterException, InterruptedException { + InterpreterResult result = rInterpreter.interpret("1+1", getInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertTrue(result.message().get(0).getData().contains("2")); + + // plotting + InterpreterContext context = getInterpreterContext(); + context.getLocalProperties().put("imageWidth", "100"); + result = rInterpreter.interpret("hist(mtcars$mpg)", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(1, result.message().size()); + assertEquals(InterpreterResult.Type.HTML, result.message().get(0).getType()); + assertTrue(result.message().get(0).getData().contains("<img src=")); + assertTrue(result.message().get(0).getData().contains("width=\"100\"")); + + result = rInterpreter.interpret("library(ggplot2)\n" + + "ggplot(diamonds, aes(x=carat, y=price, color=cut)) + geom_point()", getInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(1, result.message().size()); + assertEquals(InterpreterResult.Type.HTML, result.message().get(0).getType()); + assertTrue(result.message().get(0).getData().contains("<img src=")); + + // sparkr backend would be timeout after 10 seconds + Thread.sleep(15 * 1000); + result = rInterpreter.interpret("1+1", getInterpreterContext()); + assertEquals(InterpreterResult.Code.ERROR, result.code()); + assertTrue(result.message().get(0).getData().contains("sparkR backend is dead")); + } + + @Test + public void testInvalidR() throws InterpreterException { + tearDown(); + + Properties properties = new Properties(); + properties.setProperty("zeppelin.R.cmd", "invalid_r"); + properties.setProperty("spark.master", "local"); + properties.setProperty("spark.app.name", "test"); + + InterpreterGroup interpreterGroup = new InterpreterGroup(); + Interpreter rInterpreter = new LazyOpenInterpreter(new RInterpreter(properties)); + interpreterGroup.addInterpreterToSession(rInterpreter, "session_1"); + rInterpreter.setInterpreterGroup(interpreterGroup); + + InterpreterContext context = getInterpreterContext(); + InterpreterContext.set(context); + + try { + rInterpreter.interpret("1+1", getInterpreterContext()); + fail("Should fail to open SparkRInterpreter"); + } catch (InterpreterException e) { + String stacktrace = ExceptionUtils.getStackTrace(e); + assertTrue(stacktrace, stacktrace.contains("No such file or directory")); + } + } + + private InterpreterContext getInterpreterContext() { + InterpreterContext context = InterpreterContext.builder() + .setNoteId("note_1") + .setParagraphId("paragraph_1") + .setInterpreterOut(new InterpreterOutput(null)) + .setLocalProperties(new HashMap<>()) + .build(); + return context; + } +} diff --git a/rlang/src/test/resources/log4j.properties b/rlang/src/test/resources/log4j.properties new file mode 100644 index 0000000..0d6d5f1 --- /dev/null +++ b/rlang/src/test/resources/log4j.properties @@ -0,0 +1,26 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Root logger option +log4j.rootLogger=INFO, stdout + +# Direct log messages to stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n + +log4j.logger.org.apache.zeppelin.interpreter.util=DEBUG diff --git a/spark/interpreter/pom.xml b/spark/interpreter/pom.xml index 6e5c67a..703e2a4 100644 --- a/spark/interpreter/pom.xml +++ b/spark/interpreter/pom.xml @@ -134,6 +134,18 @@ </dependency> <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>r</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-repl_${spark.scala.binary.version}</artifactId> <version>${spark.version}</version> diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java index e276ea4..7eb1eef 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java @@ -17,43 +17,32 @@ package org.apache.zeppelin.spark; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.spark.SparkContext; -import org.apache.spark.SparkRBackend; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.zeppelin.interpreter.AbstractInterpreter; import org.apache.zeppelin.interpreter.BaseZeppelinContext; -import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; +import org.apache.zeppelin.r.RInterpreter; import org.apache.zeppelin.scheduler.Scheduler; import org.apache.zeppelin.scheduler.SchedulerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Properties; -import java.util.concurrent.atomic.AtomicBoolean; - -import static org.apache.zeppelin.spark.ZeppelinRDisplay.render; /** * R and SparkR interpreter with visualization support. */ -public class SparkRInterpreter extends AbstractInterpreter { - private static final Logger logger = LoggerFactory.getLogger(SparkRInterpreter.class); +public class SparkRInterpreter extends RInterpreter { + private static final Logger LOGGER = LoggerFactory.getLogger(SparkRInterpreter.class); - private String renderOptions; private SparkInterpreter sparkInterpreter; + private SparkVersion sparkVersion; private boolean isSpark2; - private ZeppelinR zeppelinR; - private AtomicBoolean rbackendDead = new AtomicBoolean(false); private SparkContext sc; private JavaSparkContext jsc; @@ -62,39 +51,27 @@ public class SparkRInterpreter extends AbstractInterpreter { } @Override - public void open() throws InterpreterException { - String rCmdPath = getProperty("zeppelin.R.cmd", "R"); - String sparkRLibPath; - - if (System.getenv("SPARK_HOME") != null) { - // local or yarn-client mode when SPARK_HOME is specified - sparkRLibPath = System.getenv("SPARK_HOME") + "/R/lib"; - } else if (System.getenv("ZEPPELIN_HOME") != null){ - // embedded mode when SPARK_HOME is not specified - sparkRLibPath = System.getenv("ZEPPELIN_HOME") + "/interpreter/spark/R/lib"; - // workaround to make sparkr work without SPARK_HOME - System.setProperty("spark.test.home", System.getenv("ZEPPELIN_HOME") + "/interpreter/spark"); - } else { - // yarn-cluster mode - sparkRLibPath = "sparkr"; - } - if (!new File(sparkRLibPath).exists()) { - throw new InterpreterException(String.format("sparkRLib %s doesn't exist", sparkRLibPath)); - } + protected boolean isSparkSupported() { + return true; + } + + @Override + protected boolean isSecretSupported() { + return sparkVersion.isSecretSocketSupported(); + } + + @Override + protected int sparkVersion() { + return new SparkVersion(sc.version()).toNumber(); + } + @Override + public void open() throws InterpreterException { this.sparkInterpreter = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class); this.sc = sparkInterpreter.getSparkContext(); this.jsc = sparkInterpreter.getJavaSparkContext(); - // Share the same SparkRBackend across sessions - SparkVersion sparkVersion = new SparkVersion(sc.version()); - synchronized (SparkRBackend.backend()) { - if (!SparkRBackend.isStarted()) { - SparkRBackend.init(sparkVersion); - SparkRBackend.start(); - } - } + this.sparkVersion = new SparkVersion(sc.version()); this.isSpark2 = sparkVersion.newerThanEquals(SparkVersion.SPARK_2_0_0); - int timeout = this.sc.getConf().getInt("spark.r.backendConnectionTimeout", 6000); ZeppelinRContext.setSparkContext(sc); ZeppelinRContext.setJavaSparkContext(jsc); @@ -103,21 +80,7 @@ public class SparkRInterpreter extends AbstractInterpreter { } ZeppelinRContext.setSqlContext(sparkInterpreter.getSQLContext()); ZeppelinRContext.setZeppelinContext(sparkInterpreter.getZeppelinContext()); - - zeppelinR = new ZeppelinR(rCmdPath, sparkRLibPath, SparkRBackend.port(), sparkVersion, timeout, this); - try { - zeppelinR.open(); - logger.info("ZeppelinR is opened successfully."); - } catch (IOException e) { - throw new InterpreterException("Exception while opening SparkRInterpreter", e); - } - - if (useKnitr()) { - zeppelinR.eval("library('knitr')"); - } - renderOptions = getProperty("zeppelin.R.render.options", - "out.format = 'html', comment = NA, echo = FALSE, results = 'asis', message = F, " + - "warning = F, fig.retina = 2"); + super.open(); } @Override @@ -128,12 +91,6 @@ public class SparkRInterpreter extends AbstractInterpreter { String jobGroup = Utils.buildJobGroupId(interpreterContext); String jobDesc = Utils.buildJobDesc(interpreterContext); sparkInterpreter.getSparkContext().setJobGroup(jobGroup, jobDesc, false); - - String imageWidth = getProperty("zeppelin.R.image.width", "100%"); - if (interpreterContext.getLocalProperties().containsKey("imageWidth")) { - imageWidth = interpreterContext.getLocalProperties().get("imageWidth"); - } - String setJobGroup = ""; // assign setJobGroup to dummy__, otherwise it would print NULL for this statement if (isSpark2) { @@ -153,42 +110,12 @@ public class SparkRInterpreter extends AbstractInterpreter { } lines = setPoolStmt + "\n" + lines; } - try { - // render output with knitr - if (rbackendDead.get()) { - return new InterpreterResult(InterpreterResult.Code.ERROR, - "sparkR backend is dead, please try to increase spark.r.backendConnectionTimeout"); - } - if (useKnitr()) { - zeppelinR.setInterpreterOutput(null); - zeppelinR.set(".zcmd", "\n```{r " + renderOptions + "}\n" + lines + "\n```"); - zeppelinR.eval(".zres <- knit2html(text=.zcmd)"); - String html = zeppelinR.getS0(".zres"); - - RDisplay rDisplay = render(html, imageWidth); - - return new InterpreterResult( - rDisplay.code(), - rDisplay.typ(), - rDisplay.content() - ); - } else { - // alternatively, stream the output (without knitr) - zeppelinR.setInterpreterOutput(interpreterContext.out); - zeppelinR.eval(lines); - return new InterpreterResult(InterpreterResult.Code.SUCCESS, ""); - } - } catch (Exception e) { - logger.error("Exception while connecting to R", e); - return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage()); - } + return super.internalInterpret(lines, interpreterContext); } @Override public void close() throws InterpreterException { - if (this.zeppelinR != null) { - zeppelinR.close(); - } + super.close(); if (this.sparkInterpreter != null) { this.sparkInterpreter.close(); this.sparkInterpreter = null; @@ -232,12 +159,4 @@ public class SparkRInterpreter extends AbstractInterpreter { InterpreterContext interpreterContext) { return new ArrayList<>(); } - - private boolean useKnitr() { - return Boolean.parseBoolean(getProperty("zeppelin.R.knitr", "true")); - } - - public AtomicBoolean getRbackendDead() { - return rbackendDead; - } } diff --git a/spark/interpreter/src/main/scala/org/apache/spark/SparkRBackend.scala b/spark/interpreter/src/main/scala/org/apache/spark/SparkRBackend.scala deleted file mode 100644 index 2dc3371..0000000 --- a/spark/interpreter/src/main/scala/org/apache/spark/SparkRBackend.scala +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark - -import org.apache.spark.api.r.RBackend -import org.apache.zeppelin.spark.SparkVersion - -object SparkRBackend { - val backend : RBackend = new RBackend() - private var started = false; - private var portNumber = 0; - private var secret: String = ""; - - val backendThread : Thread = new Thread("SparkRBackend") { - override def run() { - backend.run() - } - } - - def init(version: SparkVersion) : Unit = { - val rBackendClass = classOf[RBackend] - if (version.isSecretSocketSupported) { - val result = rBackendClass.getMethod("init").invoke(backend).asInstanceOf[Tuple2[Int, Object]] - portNumber = result._1 - val rAuthHelper = result._2 - secret = rAuthHelper.getClass.getMethod("secret").invoke(rAuthHelper).asInstanceOf[String] - } else { - portNumber = rBackendClass.getMethod("init").invoke(backend).asInstanceOf[Int] - } - } - - def start() : Unit = { - backendThread.start() - started = true - } - - def close() : Unit = { - backend.close() - backendThread.join() - } - - def isStarted() : Boolean = started - - def port(): Int = portNumber - - def socketSecret(): String = secret; -} diff --git a/spark/interpreter/src/main/scala/org/apache/zeppelin/spark/ZeppelinRDisplay.scala b/spark/interpreter/src/main/scala/org/apache/zeppelin/spark/ZeppelinRDisplay.scala deleted file mode 100644 index 9880691..0000000 --- a/spark/interpreter/src/main/scala/org/apache/zeppelin/spark/ZeppelinRDisplay.scala +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.spark - -import org.apache.zeppelin.interpreter.InterpreterResult.Code -import org.apache.zeppelin.interpreter.InterpreterResult.Code.{SUCCESS} -import org.apache.zeppelin.interpreter.InterpreterResult.Type -import org.apache.zeppelin.interpreter.InterpreterResult.Type.{TEXT, HTML, TABLE, IMG} -import org.jsoup.Jsoup -import org.jsoup.nodes.Element -import org.jsoup.nodes.Document.OutputSettings -import org.jsoup.safety.Whitelist - -import scala.collection.JavaConversions._ -import scala.util.matching.Regex - -class RDisplay(val content: String, val typ: Type, val code: Code) - -object ZeppelinRDisplay { - - val pattern = new Regex("""^ *\[\d*\] """) - - def render(html: String, imageWidth: String): RDisplay = { - - val document = Jsoup.parse(html) - document.outputSettings().prettyPrint(false) - - val body = document.body() - - if (body.getElementsByTag("p").isEmpty) return new RDisplay(body.html(), HTML, SUCCESS) - - val bodyHtml = body.html() - - if (! bodyHtml.contains("<img") - && ! bodyHtml.contains("<script") - && ! bodyHtml.contains("%html ") - && ! bodyHtml.contains("%table ") - && ! bodyHtml.contains("%img ") - ) { - return textDisplay(body) - } - - if (bodyHtml.contains("%table")) { - return tableDisplay(body) - } - - if (bodyHtml.contains("%img")) { - return imgDisplay(body) - } - - return htmlDisplay(body, imageWidth) - } - - private def textDisplay(body: Element): RDisplay = { - // remove HTML tag while preserving whitespaces and newlines - val text = Jsoup.clean(body.html(), "", - Whitelist.none(), new OutputSettings().prettyPrint(false)) - new RDisplay(text, TEXT, SUCCESS) - } - - private def tableDisplay(body: Element): RDisplay = { - val p = body.getElementsByTag("p").first().html.replace("“%table " , "").replace("”", "") - val r = (pattern findFirstIn p).getOrElse("") - val table = p.replace(r, "").replace("\\t", "\t").replace("\\n", "\n") - new RDisplay(table, TABLE, SUCCESS) - } - - private def imgDisplay(body: Element): RDisplay = { - val p = body.getElementsByTag("p").first().html.replace("“%img " , "").replace("”", "") - val r = (pattern findFirstIn p).getOrElse("") - val img = p.replace(r, "") - new RDisplay(img, IMG, SUCCESS) - } - - private def htmlDisplay(body: Element, imageWidth: String): RDisplay = { - var div = new String() - - for (element <- body.children) { - - val eHtml = element.html() - var eOuterHtml = element.outerHtml() - - eOuterHtml = eOuterHtml.replace("“%html " , "").replace("”", "") - - val r = (pattern findFirstIn eHtml).getOrElse("") - - div = div + eOuterHtml.replace(r, "") - } - - val content = div - .replaceAll("src=\"//", "src=\"http://") - .replaceAll("href=\"//", "href=\"http://") - - body.html(content) - - for (image <- body.getElementsByTag("img")) { - image.attr("width", imageWidth) - } - - new RDisplay(body.html, HTML, SUCCESS) - } -} diff --git a/spark/interpreter/src/main/scala/org/apache/zeppelin/spark/utils/DisplayUtils.scala b/spark/interpreter/src/main/scala/org/apache/zeppelin/spark/utils/DisplayUtils.scala deleted file mode 100644 index 8181434..0000000 --- a/spark/interpreter/src/main/scala/org/apache/zeppelin/spark/utils/DisplayUtils.scala +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.spark.utils - -import java.lang.StringBuilder - -import org.apache.spark.rdd.RDD - -import scala.collection.IterableLike - -object DisplayUtils { - - implicit def toDisplayRDDFunctions[T <: Product](rdd: RDD[T]): DisplayRDDFunctions[T] = new DisplayRDDFunctions[T](rdd) - - implicit def toDisplayTraversableFunctions[T <: Product](traversable: Traversable[T]): DisplayTraversableFunctions[T] = new DisplayTraversableFunctions[T](traversable) - - def html(htmlContent: String = "") = s"%html $htmlContent" - - def img64(base64Content: String = "") = s"%img $base64Content" - - def img(url: String) = s"<img src='$url' />" -} - -trait DisplayCollection[T <: Product] { - - def printFormattedData(traversable: Traversable[T], columnLabels: String*): Unit = { - val providedLabelCount: Int = columnLabels.size - var maxColumnCount:Int = 1 - val headers = new StringBuilder("%table ") - - val data = new StringBuilder("") - - traversable.foreach(tuple => { - maxColumnCount = math.max(maxColumnCount,tuple.productArity) - data.append(tuple.productIterator.mkString("\t")).append("\n") - }) - - if (providedLabelCount > maxColumnCount) { - headers.append(columnLabels.take(maxColumnCount).mkString("\t")).append("\n") - } else if (providedLabelCount < maxColumnCount) { - val missingColumnHeaders = ((providedLabelCount+1) to maxColumnCount).foldLeft[String](""){ - (stringAccumulator,index) => if (index==1) s"Column$index" else s"$stringAccumulator\tColumn$index" - } - - headers.append(columnLabels.mkString("\t")).append(missingColumnHeaders).append("\n") - } else { - headers.append(columnLabels.mkString("\t")).append("\n") - } - - headers.append(data) - - print(headers.toString) - } - -} - -class DisplayRDDFunctions[T <: Product] (val rdd: RDD[T]) extends DisplayCollection[T] { - - def display(columnLabels: String*)(implicit sparkMaxResult: SparkMaxResult): Unit = { - printFormattedData(rdd.take(sparkMaxResult.maxResult), columnLabels: _*) - } - - def display(sparkMaxResult:Int, columnLabels: String*): Unit = { - printFormattedData(rdd.take(sparkMaxResult), columnLabels: _*) - } -} - -class DisplayTraversableFunctions[T <: Product] (val traversable: Traversable[T]) extends DisplayCollection[T] { - - def display(columnLabels: String*): Unit = { - printFormattedData(traversable, columnLabels: _*) - } -} - -class SparkMaxResult(val maxResult: Int) extends Serializable diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java index 1aaef8a..6584391 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java @@ -26,13 +26,11 @@ import org.apache.zeppelin.interpreter.InterpreterOutput; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.LazyOpenInterpreter; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient; -import org.apache.zeppelin.python.PythonInterpreter; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.util.HashMap; -import java.util.LinkedList; import java.util.Map; import java.util.Properties; diff --git a/spark/interpreter/src/test/scala/org/apache/zeppelin/spark/utils/DisplayFunctionsTest.scala b/spark/interpreter/src/test/scala/org/apache/zeppelin/spark/utils/DisplayFunctionsTest.scala deleted file mode 100644 index 2638f17..0000000 --- a/spark/interpreter/src/test/scala/org/apache/zeppelin/spark/utils/DisplayFunctionsTest.scala +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.spark.utils - -import java.io.ByteArrayOutputStream - -import org.apache.spark.rdd.RDD -import org.apache.spark.{SparkContext, SparkConf} -import org.scalatest._ -import org.scalatest.{BeforeAndAfter} - -case class Person(login : String, name: String, age: Int) - -class DisplayFunctionsTest extends FlatSpec with BeforeAndAfter with BeforeAndAfterEach with Matchers { - var sc: SparkContext = null - var testTuples:List[(String, String, Int)] = null - var testPersons:List[Person] = null - var testRDDTuples: RDD[(String,String,Int)] = null - var testRDDPersons: RDD[Person] = null - var stream: ByteArrayOutputStream = null - - before { - val sparkConf: SparkConf = new SparkConf(true) - .setAppName("test-DisplayFunctions") - .setMaster("local") - sc = new SparkContext(sparkConf) - testTuples = List(("jdoe", "John DOE", 32), ("hsue", "Helen SUE", 27), ("rsmith", "Richard SMITH", 45)) - testRDDTuples = sc.parallelize(testTuples) - testPersons = List(Person("jdoe", "John DOE", 32), Person("hsue", "Helen SUE", 27), Person("rsmith", "Richard SMITH", 45)) - testRDDPersons = sc.parallelize(testPersons) - } - - override def beforeEach() { - stream = new java.io.ByteArrayOutputStream() - super.beforeEach() // To be stackable, must call super.beforeEach - } - - - "DisplayFunctions" should "generate correct column headers for tuples" in { - implicit val sparkMaxResult = new SparkMaxResult(100) - Console.withOut(stream) { - new DisplayRDDFunctions[(String,String,Int)](testRDDTuples).display("Login","Name","Age") - } - - stream.toString("UTF-8") should be("%table Login\tName\tAge\n" + - "jdoe\tJohn DOE\t32\n" + - "hsue\tHelen SUE\t27\n" + - "rsmith\tRichard SMITH\t45\n") - } - - "DisplayFunctions" should "generate correct column headers for case class" in { - implicit val sparkMaxResult = new SparkMaxResult(100) - Console.withOut(stream) { - new DisplayRDDFunctions[Person](testRDDPersons).display("Login","Name","Age") - } - - stream.toString("UTF-8") should be("%table Login\tName\tAge\n" + - "jdoe\tJohn DOE\t32\n" + - "hsue\tHelen SUE\t27\n" + - "rsmith\tRichard SMITH\t45\n") - } - - "DisplayFunctions" should "truncate exceeding column headers for tuples" in { - implicit val sparkMaxResult = new SparkMaxResult(100) - Console.withOut(stream) { - new DisplayRDDFunctions[(String,String,Int)](testRDDTuples).display("Login","Name","Age","xxx","yyy") - } - - stream.toString("UTF-8") should be("%table Login\tName\tAge\n" + - "jdoe\tJohn DOE\t32\n" + - "hsue\tHelen SUE\t27\n" + - "rsmith\tRichard SMITH\t45\n") - } - - "DisplayFunctions" should "pad missing column headers with ColumnXXX for tuples" in { - implicit val sparkMaxResult = new SparkMaxResult(100) - Console.withOut(stream) { - new DisplayRDDFunctions[(String,String,Int)](testRDDTuples).display("Login") - } - - stream.toString("UTF-8") should be("%table Login\tColumn2\tColumn3\n" + - "jdoe\tJohn DOE\t32\n" + - "hsue\tHelen SUE\t27\n" + - "rsmith\tRichard SMITH\t45\n") - } - - "DisplayUtils" should "restricts RDD to sparkMaxresult with implicit limit" in { - - implicit val sparkMaxResult = new SparkMaxResult(2) - - Console.withOut(stream) { - new DisplayRDDFunctions[(String,String,Int)](testRDDTuples).display("Login") - } - - stream.toString("UTF-8") should be("%table Login\tColumn2\tColumn3\n" + - "jdoe\tJohn DOE\t32\n" + - "hsue\tHelen SUE\t27\n") - } - - "DisplayUtils" should "restricts RDD to sparkMaxresult with explicit limit" in { - - implicit val sparkMaxResult = new SparkMaxResult(2) - - Console.withOut(stream) { - new DisplayRDDFunctions[(String,String,Int)](testRDDTuples).display(1,"Login") - } - - stream.toString("UTF-8") should be("%table Login\tColumn2\tColumn3\n" + - "jdoe\tJohn DOE\t32\n") - } - - "DisplayFunctions" should "display traversable of tuples" in { - - Console.withOut(stream) { - new DisplayTraversableFunctions[(String,String,Int)](testTuples).display("Login","Name","Age") - } - - stream.toString("UTF-8") should be("%table Login\tName\tAge\n" + - "jdoe\tJohn DOE\t32\n" + - "hsue\tHelen SUE\t27\n" + - "rsmith\tRichard SMITH\t45\n") - } - - "DisplayFunctions" should "display traversable of case class" in { - - Console.withOut(stream) { - new DisplayTraversableFunctions[Person](testPersons).display("Login","Name","Age") - } - - stream.toString("UTF-8") should be("%table Login\tName\tAge\n" + - "jdoe\tJohn DOE\t32\n" + - "hsue\tHelen SUE\t27\n" + - "rsmith\tRichard SMITH\t45\n") - } - - "DisplayUtils" should "display HTML" in { - DisplayUtils.html() should be ("%html ") - DisplayUtils.html("test") should be ("%html test") - } - - "DisplayUtils" should "display img" in { - DisplayUtils.img("http://www.google.com") should be ("<img src='http://www.google.com' />") - DisplayUtils.img64() should be ("%img ") - DisplayUtils.img64("abcde") should be ("%img abcde") - } - - override def afterEach() { - try super.afterEach() // To be stackable, must call super.afterEach - stream = null - } - - after { - sc.stop() - } - - -} - - diff --git a/zeppelin-interpreter-integration/pom.xml b/zeppelin-interpreter-integration/pom.xml index 644feea..9d898e6 100644 --- a/zeppelin-interpreter-integration/pom.xml +++ b/zeppelin-interpreter-integration/pom.xml @@ -503,6 +503,10 @@ <groupId>org.apache.zeppelin</groupId> <artifactId>zeppelin-python</artifactId> </exclusion> + <exclusion> + <groupId>org.apache.zeppelin</groupId> + <artifactId>r</artifactId> + </exclusion> </exclusions> </dependency> </dependencies>