Repository: incubator-zeppelin Updated Branches: refs/heads/master d5e87fb8b -> 7d6cc7e99
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/7d6cc7e9/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 76f4a31..c7d9312 100755 --- a/pom.xml +++ b/pom.xml @@ -17,7 +17,7 @@ --> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.maven-v4_0_0.xsd"> <!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/7d6cc7e9/spark-dependencies/pom.xml ---------------------------------------------------------------------- diff --git a/spark-dependencies/pom.xml b/spark-dependencies/pom.xml index 02d24e2..8e23f22 100644 --- a/spark-dependencies/pom.xml +++ b/spark-dependencies/pom.xml @@ -54,6 +54,7 @@ <spark.download.url> http://archive.apache.org/dist/spark/${spark.archive}/${spark.archive}.tgz </spark.download.url> + <spark.bin.download.url>http://archive.apache.org/dist/spark/spark-${spark.version}/spark-${spark.version}-bin-without-hadoop.tgz</spark.bin.download.url> <spark.dist.cache>${project.build.directory}/../../.spark-dist</spark.dist.cache> <py4j.version>0.8.2.1</py4j.version> </properties> @@ -802,10 +803,10 @@ <configuration> <filesets> <fileset> - <directory>${basedir}/../python/build</directory> + <directory>${project.build.directory}/spark-dist</directory> </fileset> <fileset> - <directory>${project.build.directory}/spark-dist</directory> + <directory>${basedir}/../python/build</directory> </fileset> </filesets> </configuration> @@ -844,7 +845,65 @@ <zip destfile="${project.build.directory}/../../interpreter/spark/pyspark/pyspark.zip" basedir="${project.build.directory}/spark-dist/${spark.archive}/python" includes="pyspark/*.py,pyspark/**/*.py"/> - </target> + </target> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + + <profile> + <id>sparkr</id> + <build> + <plugins> + <plugin> + <groupId>com.googlecode.maven-download-plugin</groupId> + <artifactId>download-maven-plugin</artifactId> + <version>1.2.1</version> + <executions> + <execution> + <id>download-sparkr-files</id> + <phase>validate</phase> + <goals> + <goal>wget</goal> + </goals> + <configuration> + <url>${spark.bin.download.url}</url> + <unpack>true</unpack> + <outputDirectory>${project.build.directory}/spark-bin-dist</outputDirectory> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-clean-plugin</artifactId> + <configuration> + <filesets> + <fileset> + <directory>${project.build.directory}/spark-bin-dist</directory> + </fileset> + </filesets> + </configuration> + </plugin> + <plugin> + <artifactId>maven-resources-plugin</artifactId> + <version>2.7</version> + <executions> + <execution> + <id>copy-sparkr-files</id> + <phase>generate-resources</phase> + <goals> + <goal>copy-resources</goal> + </goals> + <configuration> + <outputDirectory>${project.build.directory}/../../interpreter/spark/R/lib</outputDirectory> + <resources> + <resource> + <directory>${project.build.directory}/spark-bin-dist/spark-${spark.version}-bin-without-hadoop/R/lib</directory> + </resource> + </resources> </configuration> </execution> </executions> @@ -852,6 +911,7 @@ </plugins> </build> </profile> + </profiles> <build> http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/7d6cc7e9/spark/pom.xml ---------------------------------------------------------------------- diff --git a/spark/pom.xml b/spark/pom.xml index ece7467..3b88f3b 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -35,12 +35,14 @@ <url>http://zeppelin.incubator.apache.org</url> <properties> + <jsoup.version>1.8.2</jsoup.version> + <mockito.version>1.10.19</mockito.version> + <powermock.version>1.6.4</powermock.version> <spark.version>1.4.1</spark.version> <scala.version>2.10.4</scala.version> <scala.binary.version>2.10</scala.binary.version> </properties> - <dependencies> <dependency> <groupId>org.slf4j</groupId> @@ -231,6 +233,12 @@ <scope>provided</scope> </dependency> + <dependency> + <groupId>org.jsoup</groupId> + <artifactId>jsoup</artifactId> + <version>${jsoup.version}</version> + </dependency> + <!--TEST--> <dependency> <groupId>org.scalatest</groupId> @@ -266,6 +274,25 @@ <scope>test</scope> </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <version>${mockito.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-api-mockito</artifactId> + <version>${powermock.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-module-junit4</artifactId> + <version>${powermock.version}</version> + <scope>test</scope> + </dependency> + </dependencies> <build> @@ -285,7 +312,7 @@ <exclude>**/derby.log</exclude> <exclude>**/metastore_db/</exclude> <exclude>**/README.md</exclude> - <exclude>dependency-reduced-pom.xml</exclude> + <exclude>**/dependency-reduced-pom.xml</exclude> </excludes> </configuration> </plugin> @@ -403,4 +430,56 @@ </plugins> </build> + + <profiles> + + <!-- to deactivate 'exclude-sparkr' automatically when 'spark' is activated --> + <profile> + <id>sparkr</id> + </profile> + + <profile> + <id>exclude-sparkr</id> + <activation> + <activeByDefault>true</activeByDefault> + </activation> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <excludes> + <exclude>**/SparkRInterpreter.java</exclude> + </excludes> + <testExcludes> + <testExclude>**/SparkRInterpreterTest.java</testExclude> + <testExclude>**/ZeppelinRTest.java</testExclude> + </testExcludes> + </configuration> + </plugin> + <plugin> + <groupId>org.scala-tools</groupId> + <artifactId>maven-scala-plugin</artifactId> + <configuration> + <excludes> + <exclude>**/ZeppelinR.scala</exclude> + <exclude>**/SparkRBackend.scala</exclude> + </excludes> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <excludes> + <exclude>**/SparkRInterpreterTest.java</exclude> + </excludes> + </configuration> + </plugin> + </plugins> + </build> + </profile> + </profiles> + </project> http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/7d6cc7e9/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java new file mode 100644 index 0000000..e0ea766 --- /dev/null +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.spark; + +import static org.apache.zeppelin.spark.ZeppelinRDisplay.render; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.spark.SparkRBackend; +import org.apache.zeppelin.interpreter.*; +import org.apache.zeppelin.scheduler.Scheduler; +import org.apache.zeppelin.scheduler.SchedulerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +/** + * R and SparkR interpreter with visualization support. + */ +public class SparkRInterpreter extends Interpreter { + private static final Logger logger = LoggerFactory.getLogger(SparkRInterpreter.class); + + private static String renderOptions; + private ZeppelinR zeppelinR; + + static { + Interpreter.register( + "r", + "spark", + SparkRInterpreter.class.getName(), + new InterpreterPropertyBuilder() + .add("zeppelin.R.cmd", + SparkInterpreter.getSystemDefault("ZEPPELIN_R_CMD", "zeppelin.R.cmd", "R"), + "R repl path") + .add("zeppelin.R.knitr", + SparkInterpreter.getSystemDefault("ZEPPELIN_R_KNITR", "zeppelin.R.knitr", "true"), + "whether use knitr or not") + .add("zeppelin.R.image.width", + SparkInterpreter.getSystemDefault("ZEPPELIN_R_IMAGE_WIDTH", + "zeppelin.R.image.width", "100%"), + "") + .add("zeppelin.R.render.options", + SparkInterpreter.getSystemDefault("ZEPPELIN_R_RENDER_OPTIONS", + "zeppelin.R.render.options", + "out.format = 'html', comment = NA, " + + "echo = FALSE, results = 'asis', message = F, warning = F"), + "") + .build()); + } + + + public SparkRInterpreter(Properties property) { + super(property); + } + + @Override + public void open() { + String rCmdPath = getProperty("zeppelin.R.cmd"); + String sparkRLibPath; + + if (System.getenv("SPARK_HOME") != null) { + sparkRLibPath = System.getenv("SPARK_HOME") + "/R/lib"; + } else { + sparkRLibPath = System.getenv("ZEPPELIN_HOME") + "/interpreter/spark/R/lib"; + // workaround to make sparkr work without SPARK_HOME + System.setProperty("spark.test.home", System.getenv("ZEPPELIN_HOME") + "/interpreter/spark"); + } + + synchronized (SparkRBackend.backend()) { + if (!SparkRBackend.isStarted()) { + SparkRBackend.init(); + SparkRBackend.start(); + } + } + + int port = SparkRBackend.port(); + + SparkInterpreter sparkInterpreter = getSparkInterpreter(); + ZeppelinRContext.setSparkContext(sparkInterpreter.getSparkContext()); + ZeppelinRContext.setSqlContext(sparkInterpreter.getSQLContext()); + ZeppelinRContext.setZepplinContext(sparkInterpreter.getZeppelinContext()); + + zeppelinR = new ZeppelinR(rCmdPath, sparkRLibPath, port); + try { + zeppelinR.open(); + } catch (IOException e) { + logger.error("Exception while opening SparkRInterpreter", e); + throw new InterpreterException(e); + } + + if (useKnitr()) { + zeppelinR.eval("library('knitr')"); + } + renderOptions = getProperty("zeppelin.R.render.options"); + } + + @Override + public InterpreterResult interpret(String lines, InterpreterContext interpreterContext) { + + String imageWidth = getProperty("zeppelin.R.image.width"); + + String[] sl = lines.split("\n"); + if (sl[0].contains("{") && sl[0].contains("}")) { + String jsonConfig = sl[0].substring(sl[0].indexOf("{"), sl[0].indexOf("}") + 1); + ObjectMapper m = new ObjectMapper(); + try { + JsonNode rootNode = m.readTree(jsonConfig); + JsonNode imageWidthNode = rootNode.path("imageWidth"); + if (!imageWidthNode.isMissingNode()) imageWidth = imageWidthNode.textValue(); + } + catch (Exception e) { + logger.warn("Can not parse json config: " + jsonConfig, e); + } + finally { + lines = lines.replace(jsonConfig, ""); + } + } + + try { + // render output with knitr + if (useKnitr()) { + zeppelinR.setInterpreterOutput(null); + zeppelinR.set(".zcmd", "\n```{r " + renderOptions + "}\n" + lines + "\n```"); + zeppelinR.eval(".zres <- knit2html(text=.zcmd)"); + String html = zeppelinR.getS0(".zres"); + + RDisplay rDisplay = render(html, imageWidth); + + return new InterpreterResult( + rDisplay.code(), + rDisplay.type(), + rDisplay.content() + ); + } else { + // alternatively, stream the output (without knitr) + zeppelinR.setInterpreterOutput(interpreterContext.out); + zeppelinR.eval(lines); + return new InterpreterResult(InterpreterResult.Code.SUCCESS, ""); + } + } catch (Exception e) { + logger.error("Exception while connecting to R", e); + return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage()); + } finally { + try { + } catch (Exception e) { + // Do nothing... + } + } + } + + @Override + public void close() { + zeppelinR.close(); + } + + @Override + public void cancel(InterpreterContext context) {} + + @Override + public FormType getFormType() { + return FormType.NONE; + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } + + @Override + public Scheduler getScheduler() { + return SchedulerFactory.singleton().createOrGetFIFOScheduler( + SparkRInterpreter.class.getName() + this.hashCode()); + } + + @Override + public List<String> completion(String buf, int cursor) { + return new ArrayList<String>(); + } + + private SparkInterpreter getSparkInterpreter() { + LazyOpenInterpreter lazy = null; + SparkInterpreter spark = null; + Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName()); + + while (p instanceof WrappedInterpreter) { + if (p instanceof LazyOpenInterpreter) { + lazy = (LazyOpenInterpreter) p; + } + p = ((WrappedInterpreter) p).getInnerInterpreter(); + } + spark = (SparkInterpreter) p; + + if (lazy != null) { + lazy.open(); + } + return spark; + } + + private boolean useKnitr() { + try { + return Boolean.parseBoolean(getProperty("zeppelin.R.knitr")); + } catch (Exception e) { + return false; + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/7d6cc7e9/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java new file mode 100644 index 0000000..8d92c96 --- /dev/null +++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java @@ -0,0 +1,404 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.spark; + +import org.apache.commons.exec.*; +import org.apache.commons.exec.environment.EnvironmentUtils; +import org.apache.commons.io.IOUtils; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.InterpreterOutput; +import org.apache.zeppelin.interpreter.InterpreterOutputListener; +import parquet.org.slf4j.Logger; +import parquet.org.slf4j.LoggerFactory; + +import java.io.*; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * R repl interaction + */ +public class ZeppelinR implements ExecuteResultHandler { + Logger logger = LoggerFactory.getLogger(ZeppelinR.class); + private final String rCmdPath; + private DefaultExecutor executor; + private SparkOutputStream outputStream; + private PipedOutputStream input; + private final String scriptPath; + private final String libPath; + static Map<Integer, ZeppelinR> zeppelinR = Collections.synchronizedMap( + new HashMap<Integer, ZeppelinR>()); + + private InterpreterOutput initialOutput; + private final int port; + private boolean rScriptRunning; + + /** + * To be notified R repl initialization + */ + boolean rScriptInitialized = false; + Integer rScriptInitializeNotifier = new Integer(0); + + + /** + * Request to R repl + */ + Request rRequestObject = null; + Integer rRequestNotifier = new Integer(0); + + /** + * Request object + * + * type : "eval", "set", "get" + * stmt : statement to evaluate when type is "eval" + * key when type is "set" or "get" + * value : value object when type is "put" + */ + public static class Request { + String type; + String stmt; + Object value; + + public Request(String type, String stmt, Object value) { + this.type = type; + this.stmt = stmt; + this.value = value; + } + + public String getType() { + return type; + } + + public String getStmt() { + return stmt; + } + + public Object getValue() { + return value; + } + } + + /** + * Response from R repl + */ + Object rResponseValue = null; + boolean rResponseError = false; + Integer rResponseNotifier = new Integer(0); + + + + /** + * Create ZeppelinR instance + * @param rCmdPath R repl commandline path + * @param libPath sparkr library path + */ + public ZeppelinR(String rCmdPath, String libPath, int sparkRBackendPort) { + this.rCmdPath = rCmdPath; + this.libPath = libPath; + this.port = sparkRBackendPort; + scriptPath = System.getProperty("java.io.tmpdir") + "/zeppelin_sparkr.R"; + + } + + /** + * Start R repl + * @throws IOException + */ + public void open() throws IOException { + createRScript(); + + zeppelinR.put(hashCode(), this); + + CommandLine cmd = CommandLine.parse(rCmdPath); + cmd.addArgument("--no-save"); + cmd.addArgument("--no-restore"); + cmd.addArgument("-f"); + cmd.addArgument(scriptPath); + cmd.addArgument("--args"); + cmd.addArgument(Integer.toString(hashCode())); + cmd.addArgument(Integer.toString(port)); + cmd.addArgument(libPath); + + executor = new DefaultExecutor(); + outputStream = new SparkOutputStream(); + + input = new PipedOutputStream(); + PipedInputStream in = new PipedInputStream(input); + + PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream, outputStream, in); + executor.setWatchdog(new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT)); + executor.setStreamHandler(streamHandler); + Map env = EnvironmentUtils.getProcEnvironment(); + + + initialOutput = new InterpreterOutput(new InterpreterOutputListener() { + @Override + public void onAppend(InterpreterOutput out, byte[] line) { + logger.debug(new String(line)); + } + + @Override + public void onUpdate(InterpreterOutput out, byte[] output) { + } + }); + outputStream.setInterpreterOutput(initialOutput); + executor.execute(cmd, env, this); + rScriptRunning = true; + + // flush output + eval("cat('')"); + } + + /** + * Evaluate expression + * @param expr + * @return + */ + public Object eval(String expr) { + synchronized (this) { + rRequestObject = new Request("eval", expr, null); + return request(); + } + } + + /** + * assign value to key + * @param key + * @param value + */ + public void set(String key, Object value) { + synchronized (this) { + rRequestObject = new Request("set", key, value); + request(); + } + } + + /** + * get value of key + * @param key + * @return + */ + public Object get(String key) { + synchronized (this) { + rRequestObject = new Request("get", key, null); + return request(); + } + } + + /** + * get value of key, as a string + * @param key + * @return + */ + public String getS0(String key) { + synchronized (this) { + rRequestObject = new Request("getS", key, null); + return (String) request(); + } + } + + + /** + * Send request to r repl and return response + * @return responseValue + */ + private Object request() throws RuntimeException { + if (!rScriptRunning) { + throw new RuntimeException("r repl is not running"); + } + + // wait for rscript initialized + if (!rScriptInitialized) { + waitForRScriptInitialized(); + } + + rResponseValue = null; + + synchronized (rRequestNotifier) { + rRequestNotifier.notify(); + } + + Object respValue = null; + synchronized (rResponseNotifier) { + while (rResponseValue == null && rScriptRunning) { + try { + rResponseNotifier.wait(1000); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); + } + } + respValue = rResponseValue; + rResponseValue = null; + } + + if (rResponseError) { + throw new RuntimeException(respValue.toString()); + } else { + return respValue; + } + } + + + /** + * Wait until src/main/resources/R/zeppelin_sparkr.R is initialized + * and call onScriptInitialized() + * + * @throws InterpreterException + */ + private void waitForRScriptInitialized() throws InterpreterException { + synchronized (rScriptInitializeNotifier) { + long startTime = System.nanoTime(); + while (rScriptInitialized == false && + rScriptRunning && + System.nanoTime() - startTime < 10L * 1000 * 1000000) { + try { + rScriptInitializeNotifier.wait(1000); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); + } + } + } + + String errorMessage = ""; + try { + initialOutput.flush(); + errorMessage = new String(initialOutput.toByteArray()); + } catch (IOException e) { + e.printStackTrace(); + } + + + if (rScriptInitialized == false) { + throw new InterpreterException("sparkr is not responding " + errorMessage); + } + } + + + + /** + * invoked by src/main/resources/R/zeppelin_sparkr.R + * @return + */ + public Request getRequest() { + synchronized (rRequestNotifier) { + while (rRequestObject == null) { + try { + rRequestNotifier.wait(1000); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); + } + } + + Request req = rRequestObject; + rRequestObject = null; + return req; + } + } + + /** + * invoked by src/main/resources/R/zeppelin_sparkr.R + * @param value + * @param error + */ + public void setResponse(Object value, boolean error) { + synchronized (rResponseNotifier) { + rResponseValue = value; + rResponseError = error; + rResponseNotifier.notify(); + } + } + + /** + * invoked by src/main/resources/R/zeppelin_sparkr.R + */ + public void onScriptInitialized() { + synchronized (rScriptInitializeNotifier) { + rScriptInitialized = true; + rScriptInitializeNotifier.notifyAll(); + } + } + + + /** + * Create R script in tmp dir + */ + private void createRScript() { + ClassLoader classLoader = getClass().getClassLoader(); + File out = new File(scriptPath); + + if (out.exists() && out.isDirectory()) { + throw new InterpreterException("Can't create r script " + out.getAbsolutePath()); + } + + try { + FileOutputStream outStream = new FileOutputStream(out); + IOUtils.copy( + classLoader.getResourceAsStream("R/zeppelin_sparkr.R"), + outStream); + outStream.close(); + } catch (IOException e) { + throw new InterpreterException(e); + } + + logger.info("File {} created", scriptPath); + } + + /** + * Terminate this R repl + */ + public void close() { + executor.getWatchdog().destroyProcess(); + new File(scriptPath).delete(); + zeppelinR.remove(hashCode()); + } + + /** + * Get instance + * This method will be invoded from zeppelin_sparkr.R + * @param hashcode + * @return + */ + public static ZeppelinR getZeppelinR(int hashcode) { + return zeppelinR.get(hashcode); + } + + + /** + * Pass InterpreterOutput to capture the repl output + * @param out + */ + public void setInterpreterOutput(InterpreterOutput out) { + outputStream.setInterpreterOutput(out); + } + + + + @Override + public void onProcessComplete(int i) { + logger.info("process complete {}", i); + rScriptRunning = false; + } + + @Override + public void onProcessFailed(ExecuteException e) { + logger.error(e.getMessage(), e); + rScriptRunning = false; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/7d6cc7e9/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java new file mode 100644 index 0000000..82c320d --- /dev/null +++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.spark; + +import org.apache.spark.SparkContext; +import org.apache.spark.sql.SQLContext; + +/** + * Contains the Spark and Zeppelin Contexts made available to SparkR. + */ +public class ZeppelinRContext { + private static SparkContext sparkContext; + private static SQLContext sqlContext; + private static ZeppelinContext zeppelinContext; + + public static void setSparkContext(SparkContext sparkContext) { + ZeppelinRContext.sparkContext = sparkContext; + } + + public static void setZepplinContext(ZeppelinContext zeppelinContext) { + ZeppelinRContext.zeppelinContext = zeppelinContext; + } + + public static void setSqlContext(SQLContext sqlContext) { + ZeppelinRContext.sqlContext = sqlContext; + } + + public static SparkContext getSparkContext() { + return sparkContext; + } + + public static SQLContext getSqlContext() { + return sqlContext; + } + + public static ZeppelinContext getZeppelinContext() { + return zeppelinContext; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/7d6cc7e9/spark/src/main/resources/R/zeppelin_sparkr.R ---------------------------------------------------------------------- diff --git a/spark/src/main/resources/R/zeppelin_sparkr.R b/spark/src/main/resources/R/zeppelin_sparkr.R new file mode 100644 index 0000000..fe2a16b --- /dev/null +++ b/spark/src/main/resources/R/zeppelin_sparkr.R @@ -0,0 +1,99 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +args <- commandArgs(trailingOnly = TRUE) + +hashCode <- as.integer(args[1]) +port <- as.integer(args[2]) +libPath <- args[3] +rm(args) + +print(paste("Port ", toString(port))) +print(paste("LibPath ", libPath)) + +.libPaths(c(file.path(libPath), .libPaths())) +library(SparkR) + + +SparkR:::connectBackend("localhost", port) + +# scStartTime is needed by R/pkg/R/sparkR.R +assign(".scStartTime", as.integer(Sys.time()), envir = SparkR:::.sparkREnv) + +# getZeppelinR +.zeppelinR = SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinR", "getZeppelinR", hashCode) + +# setup spark env +assign(".sc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSparkContext"), envir = SparkR:::.sparkREnv) +assign("sc", get(".sc", envir = SparkR:::.sparkREnv), envir=.GlobalEnv) +assign(".sqlc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSqlContext"), envir = SparkR:::.sparkREnv) +assign("sqlContext", get(".sqlc", envir = SparkR:::.sparkREnv), envir = .GlobalEnv) +assign(".zeppelinContext", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getZeppelinContext"), envir = .GlobalEnv) + +z.put <- function(name, object) { + SparkR:::callJMethod(.zeppelinContext, "put", name, object) +} +z.get <- function(name) { + SparkR:::callJMethod(.zeppelinContext, "get", name) +} +z.input <- function(name, value) { + SparkR:::callJMethod(.zeppelinContext, "input", name, value) +} + +# notify script is initialized +SparkR:::callJMethod(.zeppelinR, "onScriptInitialized") + +while (TRUE) { + req <- SparkR:::callJMethod(.zeppelinR, "getRequest") + type <- SparkR:::callJMethod(req, "getType") + stmt <- SparkR:::callJMethod(req, "getStmt") + value <- SparkR:::callJMethod(req, "getValue") + + if (type == "eval") { + tryCatch({ + ret <- eval(parse(text=stmt)) + SparkR:::callJMethod(.zeppelinR, "setResponse", "", FALSE) + }, error = function(e) { + SparkR:::callJMethod(.zeppelinR, "setResponse", toString(e), TRUE) + }) + } else if (type == "set") { + tryCatch({ + ret <- assign(stmt, value) + SparkR:::callJMethod(.zeppelinR, "setResponse", "", FALSE) + }, error = function(e) { + SparkR:::callJMethod(.zeppelinR, "setResponse", toString(e), TRUE) + }) + } else if (type == "get") { + tryCatch({ + ret <- eval(parse(text=stmt)) + SparkR:::callJMethod(.zeppelinR, "setResponse", ret, FALSE) + }, error = function(e) { + SparkR:::callJMethod(.zeppelinR, "setResponse", toString(e), TRUE) + }) + } else if (type == "getS") { + tryCatch({ + ret <- eval(parse(text=stmt)) + SparkR:::callJMethod(.zeppelinR, "setResponse", toString(ret), FALSE) + }, error = function(e) { + SparkR:::callJMethod(.zeppelinR, "setResponse", toString(e), TRUE) + }) + } else { + # unsupported type + SparkR:::callJMethod(.zeppelinR, "setResponse", paste("Unsupported type ", type), TRUE) + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/7d6cc7e9/spark/src/main/scala/org/apache/spark/SparkRBackend.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/spark/SparkRBackend.scala b/spark/src/main/scala/org/apache/spark/SparkRBackend.scala new file mode 100644 index 0000000..05f1ac0 --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/SparkRBackend.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark + +import org.apache.spark.api.r.RBackend + +object SparkRBackend { + val backend : RBackend = new RBackend() + private var started = false; + private var portNumber = 0; + + val backendThread : Thread = new Thread("SparkRBackend") { + override def run() { + backend.run() + } + } + + def init() : Int = { + portNumber = backend.init() + portNumber + } + + def start() : Unit = { + backendThread.start() + started = true + } + + def close() : Unit = { + backend.close() + backendThread.join() + } + + def isStarted() : Boolean = { + started + } + + def port(): Int = { + return portNumber + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/7d6cc7e9/spark/src/main/scala/org/apache/zeppelin/spark/ZeppelinRDisplay.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/zeppelin/spark/ZeppelinRDisplay.scala b/spark/src/main/scala/org/apache/zeppelin/spark/ZeppelinRDisplay.scala new file mode 100644 index 0000000..8607226 --- /dev/null +++ b/spark/src/main/scala/org/apache/zeppelin/spark/ZeppelinRDisplay.scala @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.spark + +import org.apache.zeppelin.interpreter.InterpreterResult.Code +import org.apache.zeppelin.interpreter.InterpreterResult.Code.{SUCCESS, ERROR} +import org.apache.zeppelin.interpreter.InterpreterResult.Type +import org.apache.zeppelin.interpreter.InterpreterResult.Type.{TEXT, HTML, TABLE, IMG} +import org.jsoup.Jsoup +import org.jsoup.nodes.Element +import org.jsoup.nodes.Document + +import scala.collection.JavaConversions._ + +import scala.util.matching.Regex + +case class RDisplay(content: String, `type`: Type, code: Code) + +object ZeppelinRDisplay { + + val pattern = new Regex("""^ *\[\d*\] """) + + def render(html: String, imageWidth: String): RDisplay = { + + val document = Jsoup.parse(html) + document.outputSettings().prettyPrint(false) + + val body = document.body() + + if (body.getElementsByTag("p").isEmpty) return RDisplay(body.html(), HTML, SUCCESS) + + val bodyHtml = body.html() + + if (! bodyHtml.contains("<img") + && ! bodyHtml.contains("<script") + && ! bodyHtml.contains("%html ") + && ! bodyHtml.contains("%table ") + && ! bodyHtml.contains("%img ") + ) { + return textDisplay(body) + } + + if (bodyHtml.contains("%table")) { + return tableDisplay(body) + } + + if (bodyHtml.contains("%img")) { + return imgDisplay(body) + } + + return htmlDisplay(body, imageWidth) + + } + + private def textDisplay(body: Element): RDisplay = { + RDisplay(body.getElementsByTag("p").get(0).html(), TEXT, SUCCESS) + } + + private def tableDisplay(body: Element): RDisplay = { + val p = body.getElementsByTag("p").get(0).html.replace("â%table " , "").replace("â", "") + val r = (pattern findFirstIn p).getOrElse("") + val table = p.replace(r, "").replace("\\t", "\t").replace("\\n", "\n") + RDisplay(table, TABLE, SUCCESS) + } + + private def imgDisplay(body: Element): RDisplay = { + val p = body.getElementsByTag("p").get(0).html.replace("â%img " , "").replace("â", "") + val r = (pattern findFirstIn p).getOrElse("") + val img = p.replace(r, "") + RDisplay(img, IMG, SUCCESS) + } + + private def htmlDisplay(body: Element, imageWidth: String): RDisplay = { + + var div = new String() + + for (element <- body.children) { + + val eHtml = element.html() + var eOuterHtml = element.outerHtml() + + eOuterHtml = eOuterHtml.replace("â%html " , "").replace("â", "") + + val r = (pattern findFirstIn eHtml).getOrElse("") + + div = div + eOuterHtml.replace(r, "") + + } + + val content = div + .replaceAll("src=\"//", "src=\"http://") + .replaceAll("href=\"//", "href=\"http://") + + body.html(content) + + for (image <- body.getElementsByTag("img")) { + image.attr("width", imageWidth) + } + + RDisplay(body.html, HTML, SUCCESS) + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/7d6cc7e9/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java index 81f161a..4c5cc5e 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java @@ -60,6 +60,7 @@ public abstract class AbstractTestRestApi { static final String url = getUrlToTest(); protected static final boolean wasRunning = checkIfServerIsRunning(); static boolean pySpark = false; + static boolean sparkR = false; private String getUrl(String path) { String url; @@ -132,7 +133,7 @@ public abstract class AbstractTestRestApi { // set spark home for pyspark sparkIntpSetting.getProperties().setProperty("spark.home", getSparkHome()); pySpark = true; - + sparkR = true; ZeppelinServer.notebook.getInterpreterFactory().restart(sparkIntpSetting.id()); } else { // assume first one is spark @@ -148,6 +149,7 @@ public abstract class AbstractTestRestApi { // set spark home for pyspark sparkIntpSetting.getProperties().setProperty("spark.home", sparkHome); pySpark = true; + sparkR = true; } ZeppelinServer.notebook.getInterpreterFactory().restart(sparkIntpSetting.id()); @@ -174,6 +176,10 @@ public abstract class AbstractTestRestApi { return pySpark; } + boolean isSparkR() { + return sparkR; + } + private static String getSparkHomeRecursively(File dir) { if (dir == null) return null; File files [] = dir.listFiles(); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/7d6cc7e9/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java index cdd1806..a928e97 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java @@ -84,6 +84,30 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi { } @Test + public void sparkRTest() throws IOException { + // create new note + Note note = ZeppelinServer.notebook.createNote(); + int sparkVersion = getSparkVersionNumber(note); + + if (isSparkR() && sparkVersion >= 14) { // sparkr supported from 1.4.0 + // run markdown paragraph, again + Paragraph p = note.addParagraph(); + Map config = p.getConfig(); + config.put("enabled", true); + p.setConfig(config); + p.setText("%r localDF <- data.frame(name=c(\"a\", \"b\", \"c\"), age=c(19, 23, 18))\n" + + "df <- createDataFrame(sqlContext, localDF)\n" + + "count(df)" + ); + note.run(p.getId()); + waitForFinish(p); + assertEquals(Status.FINISHED, p.getStatus()); + assertEquals("[1] 3", p.getResult().message()); + } + ZeppelinServer.notebook.removeNote(note.id()); + } + + @Test public void pySparkTest() throws IOException { // create new note Note note = ZeppelinServer.notebook.createNote(); @@ -230,4 +254,4 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi { int version = Integer.parseInt(split[0]) * 10 + Integer.parseInt(split[1]); return version; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/7d6cc7e9/zeppelin-zengine/pom.xml ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/pom.xml b/zeppelin-zengine/pom.xml index 10a69a4..6c0275b 100644 --- a/zeppelin-zengine/pom.xml +++ b/zeppelin-zengine/pom.xml @@ -52,11 +52,11 @@ <version>${project.version}</version> </dependency> - <dependency> + <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-s3</artifactId> <version>1.10.62</version> - </dependency> + </dependency> <dependency> <groupId>com.microsoft.azure</groupId> http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/7d6cc7e9/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index fc11a41..eafbbbe 100755 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -451,6 +451,7 @@ public class ZeppelinConfiguration extends XMLConfiguration { ZEPPELIN_WAR_TEMPDIR("zeppelin.war.tempdir", "webapps"), ZEPPELIN_INTERPRETERS("zeppelin.interpreters", "org.apache.zeppelin.spark.SparkInterpreter," + "org.apache.zeppelin.spark.PySparkInterpreter," + + "org.apache.zeppelin.spark.SparkRInterpreter," + "org.apache.zeppelin.spark.SparkSqlInterpreter," + "org.apache.zeppelin.spark.DepInterpreter," + "org.apache.zeppelin.markdown.Markdown," http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/7d6cc7e9/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java index 7df7bb3..269b54a 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java @@ -379,10 +379,10 @@ public class InterpreterFactory { List<InterpreterSetting.InterpreterInfo> interpreterInfos = new LinkedList<InterpreterSetting.InterpreterInfo>(); - for (RegisteredInterpreter registeredInterpreter : - Interpreter.registeredInterpreters.values()) { - if (registeredInterpreter.getGroup().equals(groupName)) { - for (String className : interpreterClassList) { + for (String className : interpreterClassList) { + for (RegisteredInterpreter registeredInterpreter : + Interpreter.registeredInterpreters.values()) { + if (registeredInterpreter.getGroup().equals(groupName)) { if (registeredInterpreter.getClassName().equals(className)) { interpreterInfos.add( new InterpreterSetting.InterpreterInfo(
