http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/d5e87fb8/r/pom.xml ---------------------------------------------------------------------- diff --git a/r/pom.xml b/r/pom.xml new file mode 100644 index 0000000..911db10 --- /dev/null +++ b/r/pom.xml @@ -0,0 +1,396 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns="http://maven.apache.org/POM/4.0.0" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>zeppelin</artifactId> + <groupId>org.apache.zeppelin</groupId> + <version>0.6.0-incubating-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>zeppelin-zrinterpreter</artifactId> + <packaging>jar</packaging> + <name>Zeppelin: R Interpreter</name> + <description>R Interpreter for Zeppelin</description> + + <properties> + <script.extension>.sh</script.extension> + <path.separator>/</path.separator> + <spark.version>1.4.1</spark.version> + <scala.version>2.10.4</scala.version> + <scala.binary.version>2.10</scala.binary.version> + </properties> + + <developers> + <developer> + <id>amos</id> + <name>Amos Elberg</name> + </developer> + </developers> + + + <dependencies> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>zeppelin-spark-dependencies</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>zeppelin-interpreter</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>zeppelin-spark</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-repl_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-sql_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-hive_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-catalyst_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + <version>${scala.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.scalatest</groupId> + <artifactId>scalatest_${scala.binary.version}</artifactId> + <version>2.2.4</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.scalacheck</groupId> + <artifactId>scalacheck_${scala.binary.version}</artifactId> + <version>1.12.5</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + <scope>compile</scope> + </dependency> + <dependency> + <!-- jsoup HTML parser library @ http://jsoup.org/ --> + <groupId>org.jsoup</groupId> + <artifactId>jsoup</artifactId> + <version>[1.8.0,)</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.datanucleus</groupId> + <artifactId>datanucleus-core</artifactId> + <scope>test</scope> + <version>3.2.10</version> + </dependency> + <dependency> + <groupId>org.datanucleus</groupId> + <artifactId>datanucleus-api-jdo</artifactId> + <version>3.2.6</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.datanucleus</groupId> + <artifactId>datanucleus-rdbms</artifactId> + <version>3.2.9</version> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <configuration> + <excludes> + <exclude>lib/**</exclude> + <exclude>**/r/lib/**</exclude> + <!--The following files are subject to the BSD-license or variants, + as shown in the file headers--> + <exclude>**/R/rzeppelin/R/globals.R</exclude> + <exclude>**/R/rzeppelin/R/common.R</exclude> + <exclude>**/R/rzeppelin/R/protocol.R</exclude> + <exclude>**/R/rzeppelin/R/rServer.R</exclude> + <exclude>**/R/rzeppelin/R/scalaInterpreter.R</exclude> + <exclude>**/R/rzeppelin/R/zzz.R</exclude> + <exclude>**/scala/Console.scala</exclude> + <exclude>**/zeppelin/rinterpreter/rscala/Package.scala</exclude> + <exclude>**/zeppelin/rinterpreter/rscala/RClient.scala</exclude> + <!--End of files subject to BSD-license.--> + <exclude>**/.idea/</exclude> + <!--The following files are mechanical--> + <exclude>**/R/rzeppelin/DESCRIPTION</exclude> + <exclude>**/R/rzeppelin/NAMESPACE</exclude> + <!--End of mechanical R files--> + <exclude>**/*.iml</exclude> + <exclude>.gitignore</exclude> + <exclude>**/.settings/*</exclude> + <exclude>**/.classpath</exclude> + <exclude>**/.project</exclude> + <exclude>**/target/**</exclude> + <exclude>**/derby.log</exclude> + <exclude>**/metastore_db/</exclude> + <exclude>**/README.md</exclude> + <exclude>**/dependency-reduced-pom.xml</exclude> + </excludes> + </configuration> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-deploy-plugin</artifactId> + <version>2.7</version> + <configuration> + <skip>true</skip> + </configuration> + </plugin> + + <plugin> + <artifactId>maven-enforcer-plugin</artifactId> + <version>1.3.1</version> + <executions> + <execution> + <id>enforce</id> + <phase>none</phase> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.17</version> + <configuration> + <forkCount>1</forkCount> + <reuseForks>false</reuseForks> + <argLine>-Xmx1024m -XX:MaxPermSize=256m</argLine> + <skipTests>true</skipTests> + </configuration> + </plugin> + <plugin> + <groupId>org.scalatest</groupId> + <artifactId>scalatest-maven-plugin</artifactId> + <version>1.0</version> + <configuration> + <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory> + <junitxml>.</junitxml> + <filereports>testoutput.txt</filereports> + <parallel>false</parallel> + <forkMode>once</forkMode> + <systemProperties> + <scala.usejavacp>true</scala.usejavacp> + </systemProperties> + </configuration> + <executions> + <execution> + <id>test</id> + <goals> + <goal>test</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>2.3</version> + <configuration> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>org/datanucleus/**</exclude> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> + <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> + <resource>reference.conf</resource> + </transformer> + </transformers> + </configuration> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + </execution> + </executions> + </plugin> + + <!-- Deploy datanucleus jars to the interpreter/spark directory --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <version>2.8</version> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>copy</goal> + </goals> + <configuration> + <outputDirectory>${project.build.directory}/../../interpreter/spark</outputDirectory> + <overWriteReleases>false</overWriteReleases> + <overWriteSnapshots>false</overWriteSnapshots> + <overWriteIfNewer>true</overWriteIfNewer> + <artifactItems> + <artifactItem> + <groupId>${project.groupId}</groupId> + <artifactId>${project.artifactId}</artifactId> + <version>${project.version}</version> + <type>${project.packaging}</type> + </artifactItem> + </artifactItems> + </configuration> + </execution> + </executions> + </plugin> + + <!-- Plugin to compile Scala code --> + <plugin> + <groupId>org.scala-tools</groupId> + <artifactId>maven-scala-plugin</artifactId> + <executions> + <execution> + <id>compile</id> + <goals> + <goal>compile</goal> + </goals> + <phase>compile</phase> + </execution> + <execution> + <id>test-compile</id> + <goals> + <goal>testCompile</goal> + </goals> + <phase>test-compile</phase> + </execution> + <execution> + <phase>process-resources</phase> + <goals> + <goal>compile</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>exec-maven-plugin</artifactId> + <executions> + <execution> + <phase>compile</phase> + <goals> + <goal>exec</goal> + </goals> + </execution> + </executions> + <configuration> + <executable>R${path.separator}install-dev${script.extension}</executable> + </configuration> + </plugin> + <plugin> + <artifactId>maven-clean-plugin</artifactId> + <version>2.6.1</version> + <configuration> + <filesets> + <fileset> + <directory>${project.build.directory}/../../R</directory> + <includes> + <include>**/lib/**</include> + </includes> + </fileset> + <fileset> + <directory>${project.build.directory}/../../interpreter/spark</directory> + <includes> + <include>**/zeppelin-zr*.jar</include> + </includes> + </fileset> + </filesets> + </configuration> + </plugin> + </plugins> + </build> +</project>
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/d5e87fb8/r/src/main/java/org/apache/zeppelin/rinterpreter/KnitR.java ---------------------------------------------------------------------- diff --git a/r/src/main/java/org/apache/zeppelin/rinterpreter/KnitR.java b/r/src/main/java/org/apache/zeppelin/rinterpreter/KnitR.java new file mode 100644 index 0000000..63c60e2 --- /dev/null +++ b/r/src/main/java/org/apache/zeppelin/rinterpreter/KnitR.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.rinterpreter; + +import org.apache.zeppelin.interpreter.*; +import org.apache.zeppelin.scheduler.Scheduler; + +import java.net.URL; +import java.util.List; +import java.util.Properties; + +/** + * KnitR is a simple wrapper around KnitRInterpreter to handle that Zeppelin prefers + * to load interpreters through classes defined in Java with static methods that run + * when the class is loaded. + * + */ +public class KnitR extends Interpreter implements WrappedInterpreter { + KnitRInterpreter intp; + + static { + Interpreter.register("knitr", "spark", KnitR.class.getName(), + RInterpreter.getProps() + ); + } + + public KnitR(Properties property, Boolean startSpark) { + super(property); + intp = new KnitRInterpreter(property, startSpark); + } + public KnitR(Properties property) { + this(property, true); + } + + public KnitR() { + this(new Properties()); + } + + @Override + public void open() { + intp.open(); + } + + @Override + public void close() { + intp.close(); + } + + @Override + public InterpreterResult interpret(String s, InterpreterContext interpreterContext) { + return intp.interpret(s, interpreterContext); + } + + @Override + public void cancel(InterpreterContext interpreterContext) { + intp.cancel(interpreterContext); + } + + @Override + public FormType getFormType() { + return intp.getFormType(); + } + + @Override + public int getProgress(InterpreterContext interpreterContext) { + return intp.getProgress(interpreterContext); + } + + @Override + public List<String> completion(String s, int i) { + return intp.completion(s, i); + } + + @Override + public Interpreter getInnerInterpreter() { + return intp; + } + + @Override + public Scheduler getScheduler() { + return intp.getScheduler(); + } + + @Override + public void setProperty(Properties property) { + super.setProperty(property); + intp.setProperty(property); + } + + @Override + public Properties getProperty() { + return intp.getProperty(); + } + + @Override + public String getProperty(String key) { + return intp.getProperty(key); + } + + @Override + public void setInterpreterGroup(InterpreterGroup interpreterGroup) { + super.setInterpreterGroup(interpreterGroup); + intp.setInterpreterGroup(interpreterGroup); + } + + @Override + public InterpreterGroup getInterpreterGroup() { + return intp.getInterpreterGroup(); + } + + @Override + public void setClassloaderUrls(URL[] classloaderUrls) { + intp.setClassloaderUrls(classloaderUrls); + } + + @Override + public URL[] getClassloaderUrls() { + return intp.getClassloaderUrls(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/d5e87fb8/r/src/main/java/org/apache/zeppelin/rinterpreter/RRepl.java ---------------------------------------------------------------------- diff --git a/r/src/main/java/org/apache/zeppelin/rinterpreter/RRepl.java b/r/src/main/java/org/apache/zeppelin/rinterpreter/RRepl.java new file mode 100644 index 0000000..55f7219 --- /dev/null +++ b/r/src/main/java/org/apache/zeppelin/rinterpreter/RRepl.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.rinterpreter; + +import org.apache.zeppelin.interpreter.*; +import org.apache.zeppelin.scheduler.Scheduler; + +import java.net.URL; +import java.util.List; +import java.util.Properties; + +/** + * RRepl is a simple wrapper around RReplInterpreter to handle that Zeppelin prefers + * to load interpreters through classes defined in Java with static methods that run + * when the class is loaded. + * + */ +public class RRepl extends Interpreter implements WrappedInterpreter { + RReplInterpreter intp; + + static { + Interpreter.register("r", "spark", RRepl.class.getName(), + RInterpreter.getProps() + ); + } + + public RRepl(Properties property, Boolean startSpark) { + super(property); + intp = new RReplInterpreter(property, startSpark); + } + public RRepl(Properties property) { + this(property, true); + } + + public RRepl() { + this(new Properties()); + } + + @Override + public void open() { + intp.open(); + } + + @Override + public void close() { + intp.close(); + } + + @Override + public InterpreterResult interpret(String s, InterpreterContext interpreterContext) { + return intp.interpret(s, interpreterContext); + } + + @Override + public void cancel(InterpreterContext interpreterContext) { + intp.cancel(interpreterContext); + } + + @Override + public FormType getFormType() { + return intp.getFormType(); + } + + @Override + public int getProgress(InterpreterContext interpreterContext) { + return intp.getProgress(interpreterContext); + } + + @Override + public List<String> completion(String s, int i) { + return intp.completion(s, i); + } + + @Override + public Interpreter getInnerInterpreter() { + return intp; + } + + @Override + public Scheduler getScheduler() { + return intp.getScheduler(); + } + + @Override + public void setProperty(Properties property) { + super.setProperty(property); + intp.setProperty(property); + } + + @Override + public Properties getProperty() { + return intp.getProperty(); + } + + @Override + public String getProperty(String key) { + return intp.getProperty(key); + } + + @Override + public void setInterpreterGroup(InterpreterGroup interpreterGroup) { + super.setInterpreterGroup(interpreterGroup); + intp.setInterpreterGroup(interpreterGroup); + } + + @Override + public InterpreterGroup getInterpreterGroup() { + return intp.getInterpreterGroup(); + } + + @Override + public void setClassloaderUrls(URL[] classloaderUrls) { + intp.setClassloaderUrls(classloaderUrls); + } + + @Override + public URL[] getClassloaderUrls() { + return intp.getClassloaderUrls(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/d5e87fb8/r/src/main/java/org/apache/zeppelin/rinterpreter/RStatics.java ---------------------------------------------------------------------- diff --git a/r/src/main/java/org/apache/zeppelin/rinterpreter/RStatics.java b/r/src/main/java/org/apache/zeppelin/rinterpreter/RStatics.java new file mode 100644 index 0000000..361fe47 --- /dev/null +++ b/r/src/main/java/org/apache/zeppelin/rinterpreter/RStatics.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* +The purpose of this class is to provide something for R to call through the backend +to bootstrap. + */ + +package org.apache.zeppelin.rinterpreter; + +import org.apache.spark.SparkContext; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SQLContext; +import org.apache.zeppelin.spark.ZeppelinContext; + +/** + * RStatics provides static class methods that can be accessed through the SparkR bridge + * + */ +public class RStatics { + private static SparkContext sc = null; + private static ZeppelinContext z = null; + private static SQLContext sql = null; + private static RContext rCon = null; + + public static SparkContext setSC(SparkContext newSC) { + sc = newSC; + return sc; + } + + public static ZeppelinContext setZ(ZeppelinContext newZ) { + z = newZ; + return z; + } + + public static SQLContext setSQL(SQLContext newSQL) { + sql = newSQL; + return sql; + } + + public static JavaSparkContext getJSC() { + return new JavaSparkContext(sc); + } + + public static SparkContext getSC() { + return sc; + } + + public static SQLContext getSQL() { + return sql; + } + + public static Object getZ(String name) { + return z.get(name); + } + + public static void putZ(String name, Object obj) { + z.put(name, obj); + } + + public static RContext getRCon() { + return rCon; + } + public static RContext setrCon(RContext newrCon) { + rCon = newrCon; + return rCon; + } + public static Boolean testRDD(String name) { + Object x = z.get(name); + return (x instanceof org.apache.spark.api.java.JavaRDD); + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/d5e87fb8/r/src/main/scala/org/apache/spark/api/r/RBackendHelper.scala ---------------------------------------------------------------------- diff --git a/r/src/main/scala/org/apache/spark/api/r/RBackendHelper.scala b/r/src/main/scala/org/apache/spark/api/r/RBackendHelper.scala new file mode 100644 index 0000000..9c1eb38 --- /dev/null +++ b/r/src/main/scala/org/apache/spark/api/r/RBackendHelper.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* +With grattitude to Shivaram for advice regarding how to get SparkR talking to an existing SparkContext in Java + */ +package org.apache.spark.api.r + +class RBackendHelper(val backend : RBackend) { + + + + def close() : Unit = backend.close() + + + var port : Int = 0 + + def init() : Int = { + port = backend.init() + port + } + + val backendThread : Thread = new Thread("SparkR backend") { + override def run() { + backend.run() + } + } + + def start() : Thread = { + if (port == 0) throw new RuntimeException("BackendHelper must be initialized before starting") + if (!backendThread.isAlive) backendThread.start() + backendThread + } + + +/* +The sequence is: +1. Before initializing spark in R, after loading library, Backend goes up and starts listening. (Note that its able to execute arbitrary methods!!! We can use it for +zeppelin context!!!) +2. Tell SparkR to make a connection to the backend, setting the EXISTING port to the one in backendhelper. +3. Track sparkR.init, but where it calls spark/R/pkg/R/sparkR.R calls org.apache.spark.api.r.RRDD.createSparkContext to get sc, +which is then returned as a jobj link, instead call RBackendHelper.getSC + 3a Actually the object returned right now is of type JavaSparkContext ????? Need to understand this +4. SparkR for the other contexts calls related methods, org.apache.spark.sql.api.r.SQLUtils.createSQLContext and +org.apache.spark.sql.hive.HiveContext is just made new, with the jobj reference assigned to an object. We should track +the same pattern as above. + + + */ +} + + +object RBackendHelper { + +/* +This function creates a new SparkContext, but does not register it, based on whatever properties are provided. +Its for testing purposes and should never be called + */ +// def buildSparkContext( props : Properties) : SparkContext = { +// val traversableProps : Traversable[(String, String)] = propertiesAsScalaMap(props) +// val conf = new SparkConf().setAll(traversableProps) +// conf.setIfMissing("spark.master", "local") +// conf.setIfMissing("spark.app.name", "ZeppelinRContext") +// conf.validateSettings() +// new SparkContext(conf) +// } + + def apply() : RBackendHelper = new RBackendHelper(new RBackend()) + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/d5e87fb8/r/src/main/scala/org/apache/zeppelin/rinterpreter/KnitRInterpreter.scala ---------------------------------------------------------------------- diff --git a/r/src/main/scala/org/apache/zeppelin/rinterpreter/KnitRInterpreter.scala b/r/src/main/scala/org/apache/zeppelin/rinterpreter/KnitRInterpreter.scala new file mode 100644 index 0000000..bc779c7 --- /dev/null +++ b/r/src/main/scala/org/apache/zeppelin/rinterpreter/KnitRInterpreter.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.rinterpreter + + +// TODO: Capture the knitr progress bar + +import java.util._ + +import org.apache.zeppelin.interpreter.InterpreterContext +import org.apache.zeppelin.interpreter.InterpreterResult +import org.apache.zeppelin.rinterpreter.rscala.RException + + +class KnitRInterpreter(property: Properties, startSpark : Boolean = true) extends RInterpreter(property, startSpark) { + def this(property : Properties) = { + this(property, true) + } + + override def open: Unit = { + logger.trace("Opening knitr") + rContext.synchronized { + super.open + logger.debug("Knitr open, initial commands") + rContext.testRPackage("knitr", true, true, "Without knitr, the knitr interpreter cannot run.") + rContext.eval( + """opts_knit$set(out.format = 'html', + |results='asis', + |progress = FALSE, + |self.contained = TRUE, + |verbose = FALSE, + |comment = NA, + |echo = FALSE, + |tidy = FALSE) + | """.stripMargin) + } + logger.info("KnitR: Finished initial commands") + } + + def interpret(st: String, context: InterpreterContext): InterpreterResult = try { + logger.trace("interpreting" + st) + // need to convert st into an array of Strings within R + val commandSt : Array[String] = st.split("\n") + val chunkOptions = commandSt.head + val chunkLine : String = s"```{r $chunkOptions}" + val chunk : Array[String] = Array(chunkLine) ++: commandSt.tail ++: Array("```") + val out: String = rContext.synchronized { + rContext.set(".zeppknitrinput", chunk) + rContext.eval(".knitout <- knit2html(text=.zeppknitrinput, envir = rzeppelin:::.zeppenv)") + rContext.getS0(".knitout") + } + + new InterpreterResult(InterpreterResult.Code.SUCCESS, + InterpreterResult.Type.HTML, + RInterpreter.processHTML(out) + ) + } catch { + case r: RException => r.getInterpreterResult(st) + case e: Exception => new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage()) + } +} + http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/d5e87fb8/r/src/main/scala/org/apache/zeppelin/rinterpreter/RContext.scala ---------------------------------------------------------------------- diff --git a/r/src/main/scala/org/apache/zeppelin/rinterpreter/RContext.scala b/r/src/main/scala/org/apache/zeppelin/rinterpreter/RContext.scala new file mode 100644 index 0000000..ffab160 --- /dev/null +++ b/r/src/main/scala/org/apache/zeppelin/rinterpreter/RContext.scala @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.rinterpreter + +import java.io._ +import java.nio.file.{Files, Paths} +import java.util.Properties + +import org.apache.spark.SparkContext +import org.apache.spark.api.r.RBackendHelper +import org.apache.spark.sql.SQLContext +import org.apache.zeppelin.interpreter._ +import org.apache.zeppelin.rinterpreter.rscala.RClient._ +import org.apache.zeppelin.rinterpreter.rscala._ +import org.apache.zeppelin.scheduler._ +import org.apache.zeppelin.spark.{SparkInterpreter, ZeppelinContext} +import org.slf4j._ + +import scala.collection.JavaConversions._ + +// TODO: Setup rmr, etc. +// TODO: Stress-test spark. What happens on close? Etc. + +private[rinterpreter] class RContext(private val sockets: ScalaSockets, + debug: Boolean) extends RClient(sockets.in, sockets.out, debug) { + + private val logger: Logger = RContext.logger + lazy val getScheduler: Scheduler = SchedulerFactory.singleton().createOrGetFIFOScheduler(this.hashCode().toString) + + val backend: RBackendHelper = RBackendHelper() + private var sc: Option[SparkContext] = None + private var sql: Option[SQLContext] = None + private var z: Option[ZeppelinContext] = None + + val rPkgMatrix = collection.mutable.HashMap[String,Boolean]() + + var isOpen: Boolean = false + private var isFresh : Boolean = true + + private var property: Properties = null + private[rinterpreter] var sparkRStarted : Boolean = false + + override def toString() : String = s"""${super.toString()} + |\t Open: $isOpen Fresh: $isFresh SparkStarted: $sparkRStarted + |\t Progress: $progress + |\t Sockets: ${sockets.toString()} + """.stripMargin + + var progress: Int = 0 + + def getProgress: Int = { + return progress + } + + def setProgress(i: Int) : Unit = { + progress = i % 100 + } + + def incrementProgress(i: Int) : Unit = { + progress = (progress + i) % 100 + } + + // handle properties this way so it can be a mutable object shared with the R Interpreters + def setProperty(properties: Properties): Unit = synchronized { + if (property == null) property = properties + else property.putAll(properties) + } + + def open(startSpark : Option[SparkInterpreter]): Unit = synchronized { + if (isOpen && sparkRStarted) { + logger.trace("Reusing rContext.") + return + } + testRPackage("rzeppelin", fail = true, message = + "The rinterpreter cannot run without the rzeppelin package, which was included in your distribution.") + startSpark match { + case Some(x : SparkInterpreter) => { + sparkStartup(x) + } + case _ => logger.error("Could not find a SparkInterpreter") + } + isOpen = true + } + private def sparkStartup(startSpark : SparkInterpreter): Unit = try { + val sparkHome: String = System.getenv("SPARK_HOME") match { + case null => { + logger.error("SPARK_HOME is not set. The R Interpreter will start without Spark.") + return + } + case y => y + } + testRPackage("SparkR", fail = true, path = sparkHome) + if (startSpark.getSparkVersion() == null) throw new RuntimeException("No spark version") + if (!startSpark.getSparkVersion().isSparkRSupported) throw new RuntimeException("SparkR requires Spark 1.4 or later") + sc = Some(startSpark.getSparkContext()) + sql = Some(startSpark.getSQLContext()) + z = Some(startSpark.getZeppelinContext()) + logger.trace("Registered Spark Contexts") + backend.init() + backend.start() + if (!backend.backendThread.isAlive) throw new RuntimeException("SparkR could not startup because the Backend Thread is not alive") + logger.trace("Started Spark Backend") + eval( s"""SparkR:::connectBackend("localhost", ${backend.port})""") + logger.trace("SparkR backend connected") + initializeSparkR(sc.get, sql.get, z.get) + logger.info("Initialized SparkR") + sparkRStarted = true + } catch { + case e: Exception => throw new RuntimeException(""" + Could not connect R to Spark. If the stack trace is not clear, + check whether SPARK_HOME is set properly.""", e) + } + + private def initializeSparkR(sc : SparkContext, sql : SQLContext, z : ZeppelinContext) : Unit = synchronized { + + logger.trace("Getting a handle to the JavaSparkContext") + + eval("assign(\".scStartTime\", as.integer(Sys.time()), envir = SparkR:::.sparkREnv)") + RStatics.setSC(sc) + eval( + """ + |assign( + |".sparkRjsc", + |SparkR:::callJStatic("org.apache.zeppelin.rinterpreter.RStatics", + | "getJSC"), + | envir = SparkR:::.sparkREnv)""".stripMargin) + + eval("assign(\"sc\", get(\".sparkRjsc\", envir = SparkR:::.sparkREnv), envir=.GlobalEnv)") + + logger.trace("Established SparkR Context") + + val sqlEnvName = sql match { + case null => throw new RuntimeException("Tried to initialize SparkR without setting a SQLContext") + case x : org.apache.spark.sql.hive.HiveContext => ".sparkRHivesc" + case x : SQLContext => ".sparkRSQLsc" + } + RStatics.setSQL(sql) + eval( + s""" + |assign( + |"${sqlEnvName}", + |SparkR:::callJStatic("org.apache.zeppelin.rinterpreter.RStatics", + | "getSQL"), + | envir = SparkR:::.sparkREnv)""".stripMargin) + eval( + s""" + |assign("sqlContext", + |get("$sqlEnvName", + |envir = SparkR:::.sparkREnv), + |envir = .GlobalEnv) + """.stripMargin) + + logger.trace("Proving spark") + val proof = evalS1("names(SparkR:::.sparkREnv)") + logger.info("Proof of spark is : " + proof.mkString) + + RStatics.setZ(z) + + RStatics.setrCon(this) + eval( + s""" + |assign(".rContext", + | SparkR:::callJStatic("org.apache.zeppelin.rinterpreter.RStatics", + | "getRCon"), + | envir = .GlobalEnv) + """.stripMargin + ) + } + + def close(): Unit = synchronized { + if (isOpen) { + if (sparkRStarted) { + try { + eval("SparkR:::sparkR.stop()") + } catch { + case e: RException => {} + case e: Exception => logger.error("Error closing SparkR", e) + } + } + try { + backend.close + backend.backendThread.stop() + } catch { + case e: Exception => logger.error("Error closing RContext ", e) + } + try { + exit() + } catch { + case e: Exception => logger.error("Shutdown error", e) + } + } + isOpen = false + } + + + private[rinterpreter] def testRPackage(pack: String, + fail: Boolean = false, + license: Boolean = false, + message: String = "", + path : String = ""): Boolean = synchronized { + + + rPkgMatrix.get(pack) match { + case Some(x: Boolean) => return x + case None => {} + } + + evalB0( s"""require('$pack',quietly=TRUE, lib.loc="$path/R/lib/")""") match { + case true => { + rPkgMatrix.put(pack, true) + return (true) + } + case false => { + evalB0(s"require('$pack', quietly=TRUE)") match { + case true => { + rPkgMatrix.put(pack, true) + return true + } + case false => { + rPkgMatrix.put(pack, false) + val failMessage = + s"""The $pack package could not be loaded. """ + { + if (license) "We cannot install it for you because it is published under the GPL3 license." + else "" + } + message + logger.error(failMessage) + if (fail) throw new RException(failMessage) + return (false) + } + } + } + } + } + + logger.info("RContext Finished Starting") +} + +object RContext { + val logger: Logger = LoggerFactory.getLogger(getClass) + + logger.trace("Inside the RContext Object") + private val contextMap : collection.mutable.HashMap[String, RContext] = collection.mutable.HashMap[String,RContext]() + + // This function is here to work around inconsistencies in the SparkInterpreter startup sequence + // that caused testing issues + private[rinterpreter] def resetRcon() : Boolean = synchronized { + contextMap foreach((con) => { + con._2.close() + if (con._2.isOpen) throw new RuntimeException("Failed to close an existing RContext") + contextMap.remove(con._1) + }) + return true + } + + def apply( property: Properties, id : String): RContext = synchronized { + contextMap.get(id) match { + case Some(x : RContext) if x.isFresh || x.isOpen => return(x) + case Some(x : RContext) => resetRcon() + case _ => {} + } + val debug: Boolean = property.getProperty("rscala.debug", "false").toBoolean + val timeout: Int = property.getProperty("rscala.timeout", "60").toInt + import scala.sys.process._ + logger.trace("Creating processIO") + var cmd: PrintWriter = null + val command = RClient.defaultRCmd +: RClient.defaultArguments + val processCmd = Process(command) + + val processIO = new ProcessIO( + o => { + cmd = new PrintWriter(o) + }, + reader("STDOUT DEBUG: "), + reader("STDERR DEBUG: "), + true + ) + val portsFile = File.createTempFile("rscala-", "") + val processInstance = processCmd.run(processIO) + // Find rzeppelin + val libpath : String = if (Files.exists(Paths.get("R/lib"))) "R/lib" + else if (Files.exists(Paths.get("../R/lib"))) "../R/lib" + else throw new RuntimeException("Could not find rzeppelin - it must be in either R/lib or ../R/lib") + val snippet = + s""" +library(lib.loc="$libpath", rzeppelin) +rzeppelin:::rServe(rzeppelin:::newSockets('${portsFile.getAbsolutePath.replaceAll(File.separator, "/")}',debug=${if (debug) "TRUE" else "FALSE"},timeout=${timeout})) +q(save='no')""" + while (cmd == null) Thread.sleep(100) + cmd.println(snippet) + cmd.flush() + val sockets = RClient.makeSockets(portsFile.getAbsolutePath) + sockets.out.writeInt(RClient.Protocol.OK) + sockets.out.flush() + val packVersion = RClient.readString(sockets.in) + if (packVersion != org.apache.zeppelin.rinterpreter.rscala.Version) { + logger.warn("Connection to R started but versions don't match " + packVersion + " " + org.apache.zeppelin.rinterpreter.rscala.Version) + } else { + logger.trace("Connected to a new R Session") + } + val context = new RContext(sockets, debug) + context.setProperty(property) + contextMap.put(id, context) + context + } +} + http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/d5e87fb8/r/src/main/scala/org/apache/zeppelin/rinterpreter/RInterpreter.scala ---------------------------------------------------------------------- diff --git a/r/src/main/scala/org/apache/zeppelin/rinterpreter/RInterpreter.scala b/r/src/main/scala/org/apache/zeppelin/rinterpreter/RInterpreter.scala new file mode 100644 index 0000000..2859f30 --- /dev/null +++ b/r/src/main/scala/org/apache/zeppelin/rinterpreter/RInterpreter.scala @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.rinterpreter + +import java.nio.file.{Files, Paths} +import java.util._ + +import org.apache.commons.codec.binary.{Base64, StringUtils} +import org.apache.zeppelin.interpreter.Interpreter.FormType +import org.apache.zeppelin.interpreter.remote.RemoteInterpreter +import org.apache.zeppelin.interpreter.{InterpreterContext, _} +import org.apache.zeppelin.scheduler.Scheduler +import org.apache.zeppelin.spark.SparkInterpreter +import org.jsoup.Jsoup +import org.jsoup.nodes._ +import org.jsoup.select.Elements +import org.slf4j.{Logger, LoggerFactory} + +import scala.collection.JavaConversions._ +import scala.io.Source + +abstract class RInterpreter(properties : Properties, startSpark : Boolean = true) extends Interpreter (properties) { + + protected val logger: Logger = RInterpreter.logger + logger.trace("Initialising an RInterpreter of class " + this.getClass.getName) + + def getrContext: RContext = rContext + + protected lazy val rContext : RContext = synchronized{ RContext(property, this.getInterpreterGroup().getId()) } + + def open: Unit = rContext.synchronized { + logger.trace("RInterpreter opening") + // We leave this as an Option[] because the pattern of nesting SparkInterpreter inside of wrapper interpreters + // has changed several times, and this allows us to fail more gracefully and handle those changes in one place. + val intp : Option[SparkInterpreter] = getSparkInterpreter() + rContext.open(intp) + rContext.testRPackage("htmltools", message = + """You can continue + | without it, but some interactive visualizations will fail. + | You can install it from cran."""") + rContext.testRPackage("repr", license = true, message = + """You can continue + | without it, but some forms of output from the REPL may not appear properly."""") + rContext.testRPackage("base64enc", license = true, message = + """You can continue + | without it, but the REPL may not show images properly.""") + rContext.testRPackage("evaluate", license = false, message = + """ + |The REPL needs this to run. It can be installed from CRAN + | Thanks to Hadley Wickham and Yihui Xie for graciously making evaluate available under an Apache-compatible + | license so it can be used with this project.""".stripMargin) + } + + def getSparkInterpreter() : Option[SparkInterpreter] = + getSparkInterpreter(getInterpreterInTheSameSessionByClassName(classOf[SparkInterpreter].getName)) + + def getSparkInterpreter(p1 : Interpreter) : Option[SparkInterpreter] = p1 match { + case s : SparkInterpreter => Some[SparkInterpreter](s) + case lzy : LazyOpenInterpreter => { + val p = lzy.getInnerInterpreter + lzy.open() + return getSparkInterpreter(p) + } + case w : WrappedInterpreter => return getSparkInterpreter(w.getInnerInterpreter) + case _ => None + } + + def close: Unit = { + rContext.close + } + + def getProgress(context :InterpreterContext): Int = rContext.getProgress + + def cancel(context:InterpreterContext) : Unit = {} + + def getFormType: FormType = { + return FormType.NONE + } + + override def getScheduler : Scheduler = rContext.getScheduler + + // TODO: completion is disabled because it could not be tested with current Zeppelin code + def completion(buf :String,cursor : Int) : List[String] = Array[String]("").toList + + private[rinterpreter] def hiddenCompletion(buf :String,cursor : Int) : List[String] = + rContext.evalS1(s""" + |rzeppelin:::.z.completion("$buf", $cursor) + """.stripMargin).toList +} + +object RInterpreter { + + private val logger: Logger = LoggerFactory.getLogger(getClass) + logger.trace("logging inside the RInterpreter singleton") + + // These are the additional properties we need on top of the ones provided by the spark interpreters + lazy val props: Map[String, InterpreterProperty] = new InterpreterPropertyBuilder() + .add("rhadoop.cmd", SparkInterpreter.getSystemDefault("rhadoop.cmd", "HADOOP_CMD", ""), "Usually /usr/bin/hadoop") + .add("rhadooop.streamingjar", SparkInterpreter.getSystemDefault("rhadoop.cmd", "HADOOP_STREAMING", ""), "Usually /usr/lib/hadoop/contrib/streaming/hadoop-streaming-<version>.jar") + .add("rscala.debug", SparkInterpreter.getSystemDefault("rscala.debug","RSCALA_DEBUG", "false"), "Whether to turn on rScala debugging") // TEST: Implemented but not tested + .add("rscala.timeout", SparkInterpreter.getSystemDefault("rscala.timeout","RSCALA_TIMEOUT", "60"), "Timeout for rScala") // TEST: Implemented but not tested + .build + + def getProps() = { + props + } + + // Some R interactive visualization packages insist on producing HTML that refers to javascript + // or css by file path. These functions are intended to load those files and embed them into the + // HTML as Base64 encoded DataURIs. + //FIXME These don't error but may not yet properly be converting script links + def scriptToBase(doc : Element, testAttr : String, tag : String, mime : String): Unit = { + val elems : Elements = doc.getElementsByTag(tag) + elems.filter( (e : Element) => { + e.attributes().hasKey(testAttr) && e.attr(testAttr) != "" && e.attr(testAttr).slice(0,1) == "/" + }).foreach(scriptToBase(_, testAttr, mime)) + } + + def scriptToBase(node : Element, field : String, mime : String) : Unit = node.attr(field) match { + case x if Files.exists(Paths.get(x)) => node.attr(field, dataURI(x, mime)) + case x if x.slice(0,4) == "http" => {} + case x if x.contains("ajax") => {} + case x if x.contains("googleapis") => {} + case x if x.slice(0,2) == "//" => node.attr(field, "http:" + x) + case _ => {} + } + + def dataURI(file : String, mime : String) : String = { + val data: String = Source.fromFile(file).getLines().mkString("\n") + s"""data:${mime};base64,""" + StringUtils.newStringUtf8(Base64.encodeBase64(data.getBytes(), false)) + } + + // The purpose here is to deal with knitr producing HTML with script and css tags outside the <body> + def processHTML(input: Array[String]): String = processHTML(input.mkString("\n")) + + def processHTML(input: String) : String = { + val doc : Document = Jsoup.parse(input) + processHTML(doc) + } + + private def processHTML(doc : Document) : String = { + val bod : Element = doc.body() + val head : Element = doc.head() + // Try to ignore the knitr script that breaks zeppelin display + head.getElementsByTag("script").reverseIterator.foreach(bod.prependChild(_)) + // Only get css from head if it links to a file + head.getElementsByTag("link").foreach(bod.prependChild(_)) + scriptToBase(bod, "href", "link", "text/css") + scriptToBase(bod, "src", "script", "text/javascript") + bod.html() + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/d5e87fb8/r/src/main/scala/org/apache/zeppelin/rinterpreter/RReplInterpreter.scala ---------------------------------------------------------------------- diff --git a/r/src/main/scala/org/apache/zeppelin/rinterpreter/RReplInterpreter.scala b/r/src/main/scala/org/apache/zeppelin/rinterpreter/RReplInterpreter.scala new file mode 100644 index 0000000..63be302 --- /dev/null +++ b/r/src/main/scala/org/apache/zeppelin/rinterpreter/RReplInterpreter.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.rinterpreter + + +// TODO: Option for setting size of output images + +import java.util._ + +import org.apache.zeppelin.interpreter.InterpreterContext +import org.apache.zeppelin.interpreter.InterpreterResult +import org.apache.zeppelin.rinterpreter.rscala.RException + +class RReplInterpreter(property: Properties, startSpark : Boolean = true) extends RInterpreter(property, startSpark) { + + // protected val rContext : RContext = RContext(property) + + def this(property : Properties) = { + this(property, true) + } + private var firstCell : Boolean = true + def interpret(st: String, context: InterpreterContext): InterpreterResult = { + rContext.synchronized { + try { + import scala.collection.immutable._ + logger.info("intrpreting " + st) + rContext.set(".zreplin", st.split("\n")) + rContext.eval(".zreplout <- rzeppelin:::.z.valuate(.zreplin)") + + val reslength: Int = rContext.evalI0("length(.zreplout)") + logger.debug("Length of evaluate result is " + reslength) + var gotError: Boolean = false + val result: String = List.range(1, reslength + 1).map((i: Int) => { + rContext.evalS1(s"class(.zreplout[[${i}]])") match { + case x: Array[String] if x contains ("recordedplot") => { + if (!rContext.testRPackage("repr", fail = false)) return new InterpreterResult(InterpreterResult.Code.ERROR, + InterpreterResult.Type.TEXT, + "Displaying images through the R REPL requires the repr package, which is not installed.") + val image: String = rContext.evalS0(s"base64enc:::base64encode(repr:::repr_jpg(.zreplout[[${i}]]))") + return new InterpreterResult(InterpreterResult.Code.SUCCESS, + InterpreterResult.Type.IMG, image) + } + //TODO: If the html contains a link to a file, transform it to a DataURI. This is necessary for htmlwidgets + case x: Array[String] if x contains ("html") => { + val html: String = RInterpreter.processHTML(rContext.evalS0(s"rzeppelin:::.z.repr(.zreplout[[${i}]])")) + return new InterpreterResult(InterpreterResult.Code.SUCCESS, + InterpreterResult.Type.HTML, html) + } + case x: Array[String] if x contains "data.frame" => { + val table: Array[String] = rContext.evalS1( s"""rzeppelin:::.z.table(.zreplout[[${i}]])""") + return new InterpreterResult(InterpreterResult.Code.SUCCESS, + InterpreterResult.Type.TABLE, + table.mkString(sep = "\n")) + } + case x: Array[String] if x contains "source" => rContext.evalS0(s".zreplout[[${i}]]" + "$src") + case x: Array[String] if x contains "character" => rContext.evalS0(s".zreplout[[${i}]]") + case x: Array[String] if x contains "packageStartupMessage" => if (firstCell) {""} else { + firstCell = true + "Package Startup Message: " + rContext.evalS1(s"rzeppelin:::.z.repr(.zreplout[[${i}]])").mkString("\n") + } + case x: Array[String] if x contains "simpleError" => { + gotError = true + val error = rContext.evalS1(s"rzeppelin:::.z.repr(.zreplout[[${i}]])").mkString("\n") + logger.error(error) + error + } + case _ => rContext.evalS1(s"rzeppelin:::.z.repr(.zreplout[[${i}]])").mkString("\n") + } + }).mkString("\n\n") + return new InterpreterResult({ + if (!gotError) InterpreterResult.Code.SUCCESS + else InterpreterResult.Code.ERROR + }, result) + } catch { + case re: RException => return re.getInterpreterResult(st) + case e: Exception => { + logger.error("Error interpreting " + st, e) + return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage() + e.getStackTrace) + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/d5e87fb8/r/src/main/scala/org/apache/zeppelin/rinterpreter/package.scala ---------------------------------------------------------------------- diff --git a/r/src/main/scala/org/apache/zeppelin/rinterpreter/package.scala b/r/src/main/scala/org/apache/zeppelin/rinterpreter/package.scala new file mode 100644 index 0000000..d354107 --- /dev/null +++ b/r/src/main/scala/org/apache/zeppelin/rinterpreter/package.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin + +// TODO: Keeping interpreter out of spark interpreter group for now, until the context sharing code is developed +// TEST: rmr2 +// TODO: Link getProgress to plyr (and knitr progress) if possible +// TODO: Forms? +// TODO: Completion? Currently commented-out +// TODO: It would be nice if the RReplInterpreter output svg instead of jpg, or intelligently selected, at a minimum +// TODO: Some kind of proxy may be necessary for shiny and widgets see http://blog.dominodatalab.com/interactive-dashboards-with-knitr-and-html-widgets/ + +package object rinterpreter { +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/d5e87fb8/r/src/main/scala/org/apache/zeppelin/rinterpreter/rscala/Package.scala ---------------------------------------------------------------------- diff --git a/r/src/main/scala/org/apache/zeppelin/rinterpreter/rscala/Package.scala b/r/src/main/scala/org/apache/zeppelin/rinterpreter/rscala/Package.scala new file mode 100644 index 0000000..4028dd5 --- /dev/null +++ b/r/src/main/scala/org/apache/zeppelin/rinterpreter/rscala/Package.scala @@ -0,0 +1,39 @@ +package org.apache.zeppelin.rinterpreter +/* +Copyright (c) 2013-2015, David B. Dahl, Brigham Young University + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + + Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the + distribution. + + Neither the name of the <ORGANIZATION> nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package object rscala { + + val Version = "0.1.0" + + val Date = "2015-05-15" + +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/d5e87fb8/r/src/main/scala/org/apache/zeppelin/rinterpreter/rscala/RClient.scala ---------------------------------------------------------------------- diff --git a/r/src/main/scala/org/apache/zeppelin/rinterpreter/rscala/RClient.scala b/r/src/main/scala/org/apache/zeppelin/rinterpreter/rscala/RClient.scala new file mode 100644 index 0000000..b73524e --- /dev/null +++ b/r/src/main/scala/org/apache/zeppelin/rinterpreter/rscala/RClient.scala @@ -0,0 +1,527 @@ +/* +Copyright (c) 2013-2015, David B. Dahl, Brigham Young University + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + + Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the + distribution. + + Neither the name of the <ORGANIZATION> nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package org.apache.zeppelin.rinterpreter.rscala + +// TODO: Add libdir to constructor + +import java.io._ +import java.net.{InetAddress, ServerSocket} + +import org.slf4j.{Logger, LoggerFactory} + +import scala.language.dynamics + +class RClient (private val in: DataInputStream, + private val out: DataOutputStream, + val debug: Boolean = true) extends Dynamic { + var damagedState : Boolean = false + private val logger: Logger = LoggerFactory.getLogger(getClass) + + case class RObjectRef(val reference : String) { + override def toString() = ".$"+reference + } + + /** __For rscala developers only__: Sets whether debugging output should be displayed. */ + def debug_=(v: Boolean) = { + if ( v != debug ) { + if ( debug ) logger.debug("Sending DEBUG request.") + out.writeInt(RClient.Protocol.DEBUG) + out.writeInt(if ( v ) 1 else 0) + out.flush() + } + } + + def exit() = { + logger.debug("Sending EXIT request.") + out.writeInt(RClient.Protocol.EXIT) + out.flush() + } + + def eval(snippet: String, evalOnly: Boolean = true): Any = try { + if (damagedState) throw new RException("Connection to R already damaged") + logger.debug("Sending EVAL request.") + out.writeInt(RClient.Protocol.EVAL) + RClient.writeString(out,snippet) + out.flush() + val status = in.readInt() + val output = RClient.readString(in) + if ( output != "" ) { + logger.error("R Error " + snippet + " " + output) + throw new RException(snippet, output) + } + if ( status != RClient.Protocol.OK ) throw new RException(snippet, output, "Error in R evaluation.") + if ( evalOnly ) null else get(".rzeppelin.last.value")._1 + } catch { + case e : java.net.SocketException => { + logger.error("Connection to R appears to have shut down" + e) + damagedState = true + } + } + + def evalI0(snippet: String) = { eval(snippet,true); getI0(".rzeppelin.last.value") } + + def evalB0(snippet: String) = { eval(snippet,true); getB0(".rzeppelin.last.value") } + + def evalS0(snippet: String) = { eval(snippet,true); getS0(".rzeppelin.last.value") } + + def evalI1(snippet: String) = { eval(snippet,true); getI1(".rzeppelin.last.value") } + + def evalB1(snippet: String) = { eval(snippet,true); getB1(".rzeppelin.last.value") } + + def evalS1(snippet: String) = { eval(snippet,true); getS1(".rzeppelin.last.value") } + + def evalR( snippet: String) = { eval(snippet,true); getR( ".rzeppelin.last.value") } + + def set(identifier: String, value: Any): Unit = set(identifier,value,"",true) + + def set(identifier: String, value: Any, index: String = "", singleBrackets: Boolean = true): Unit = { + if (damagedState) throw new RException("Connection to R already damaged") + val v = value + if ( index == "" ) out.writeInt(RClient.Protocol.SET) + else if ( singleBrackets ) { + out.writeInt(RClient.Protocol.SET_SINGLE) + RClient.writeString(out,index) + } else { + out.writeInt(RClient.Protocol.SET_DOUBLE) + RClient.writeString(out,index) + } + RClient.writeString(out,identifier) + if ( v == null || v.isInstanceOf[Unit] ) { + logger.debug("... which is null") + out.writeInt(RClient.Protocol.NULLTYPE) + out.flush() + if ( index != "" ) { + val status = in.readInt() + if ( status != RClient.Protocol.OK ) { + val output = RClient.readString(in) + if ( output != "" ) { + logger.error("R error setting " + output) + throw new RException(identifier + value.toString(), output, "Error setting") + } + throw new RException("Error in R evaluation. Set " + identifier + " to " + value.toString()) + } + } + return + } + val c = v.getClass + logger.debug("... whose class is: "+c) + logger.debug("... and whose value is: "+v) + if ( c.isArray ) { + c.getName match { + case "[I" => + val vv = v.asInstanceOf[Array[Int]] + out.writeInt(RClient.Protocol.VECTOR) + out.writeInt(vv.length) + out.writeInt(RClient.Protocol.INTEGER) + for ( i <- 0 until vv.length ) out.writeInt(vv(i)) + case "[Z" => + val vv = v.asInstanceOf[Array[Boolean]] + out.writeInt(RClient.Protocol.VECTOR) + out.writeInt(vv.length) + out.writeInt(RClient.Protocol.BOOLEAN) + for ( i <- 0 until vv.length ) out.writeInt(if ( vv(i) ) 1 else 0) + case "[Ljava.lang.String;" => + val vv = v.asInstanceOf[Array[String]] + out.writeInt(RClient.Protocol.VECTOR) + out.writeInt(vv.length) + out.writeInt(RClient.Protocol.STRING) + for ( i <- 0 until vv.length ) RClient.writeString(out,vv(i)) + case _ => + throw new RException("Unsupported array type: "+c.getName) + } + } else { + c.getName match { + case "java.lang.Integer" => + out.writeInt(RClient.Protocol.ATOMIC) + out.writeInt(RClient.Protocol.INTEGER) + out.writeInt(v.asInstanceOf[Int]) + case "java.lang.Boolean" => + out.writeInt(RClient.Protocol.ATOMIC) + out.writeInt(RClient.Protocol.BOOLEAN) + out.writeInt(if (v.asInstanceOf[Boolean]) 1 else 0) + case "java.lang.String" => + out.writeInt(RClient.Protocol.ATOMIC) + out.writeInt(RClient.Protocol.STRING) + RClient.writeString(out,v.asInstanceOf[String]) + case _ => + throw new RException("Unsupported non-array type: "+c.getName) + } + } + out.flush() + if ( index != "" ) { + val status = in.readInt() + if ( status != RClient.Protocol.OK ) { + val output = RClient.readString(in) + if ( output != "" ) throw new RException(identifier + value.toString(), output, "Error setting") + throw new RException("Error in R evaluation.") + } + } + } + + def get(identifier: String, asReference: Boolean = false): (Any,String) = { + logger.debug("Getting: "+identifier) + out.writeInt(if ( asReference ) RClient.Protocol.GET_REFERENCE else RClient.Protocol.GET) + RClient.writeString(out,identifier) + out.flush() + if ( asReference ) { + val r = in.readInt() match { + case RClient.Protocol.REFERENCE => (RObjectRef(RClient.readString(in)),"RObject") + case RClient.Protocol.UNDEFINED_IDENTIFIER => + throw new RException("Undefined identifier") + } + return r + } + in.readInt match { + case RClient.Protocol.NULLTYPE => + logger.debug("Getting null.") + (null,"Null") + case RClient.Protocol.ATOMIC => + logger.debug("Getting atomic.") + in.readInt() match { + case RClient.Protocol.INTEGER => (in.readInt(),"Int") + case RClient.Protocol.DOUBLE => (in.readDouble(),"Double") + case RClient.Protocol.BOOLEAN => (( in.readInt() != 0 ),"Boolean") + case RClient.Protocol.STRING => (RClient.readString(in),"String") + case _ => throw new RException("Protocol error") + } + case RClient.Protocol.VECTOR => + logger.debug("Getting vector...") + val length = in.readInt() + logger.debug("... of length: "+length) + in.readInt() match { + case RClient.Protocol.INTEGER => (Array.fill(length) { in.readInt() },"Array[Int]") + case RClient.Protocol.DOUBLE => (Array.fill(length) { in.readDouble() },"Array[Double]") + case RClient.Protocol.BOOLEAN => (Array.fill(length) { ( in.readInt() != 0 ) },"Array[Boolean]") + case RClient.Protocol.STRING => (Array.fill(length) { RClient.readString(in) },"Array[String]") + case _ => throw new RException("Protocol error") + } + case RClient.Protocol.MATRIX => + logger.debug("Getting matrix...") + val nrow = in.readInt() + val ncol = in.readInt() + logger.debug("... of dimensions: "+nrow+","+ncol) + in.readInt() match { + case RClient.Protocol.INTEGER => (Array.fill(nrow) { Array.fill(ncol) { in.readInt() } },"Array[Array[Int]]") + case RClient.Protocol.DOUBLE => (Array.fill(nrow) { Array.fill(ncol) { in.readDouble() } },"Array[Array[Double]]") + case RClient.Protocol.BOOLEAN => (Array.fill(nrow) { Array.fill(ncol) { ( in.readInt() != 0 ) } },"Array[Array[Boolean]]") + case RClient.Protocol.STRING => (Array.fill(nrow) { Array.fill(ncol) { RClient.readString(in) } },"Array[Array[String]]") + case _ => throw new RException("Protocol error") + } + case RClient.Protocol.UNDEFINED_IDENTIFIER => throw new RException("Undefined identifier") + case RClient.Protocol.UNSUPPORTED_STRUCTURE => throw new RException("Unsupported data type") + case _ => throw new RException("Protocol error") + } + } + + def getI0(identifier: String): Int = get(identifier) match { + case (a,"Int") => a.asInstanceOf[Int] + case (a,"Double") => a.asInstanceOf[Double].toInt + case (a,"Boolean") => if (a.asInstanceOf[Boolean]) 1 else 0 + case (a,"String") => a.asInstanceOf[String].toInt + case (a,"Array[Int]") => a.asInstanceOf[Array[Int]](0) + case (a,"Array[Double]") => a.asInstanceOf[Array[Double]](0).toInt + case (a,"Array[Boolean]") => if ( a.asInstanceOf[Array[Boolean]](0) ) 1 else 0 + case (a,"Array[String]") => a.asInstanceOf[Array[String]](0).toInt + case (_,tp) => throw new RException(s"Unable to cast ${tp} to Int") + } + + def getD0(identifier: String): Double = get(identifier) match { + case (a,"Int") => a.asInstanceOf[Int].toDouble + case (a,"Double") => a.asInstanceOf[Double] + case (a,"Boolean") => if (a.asInstanceOf[Boolean]) 1.0 else 0.0 + case (a,"String") => a.asInstanceOf[String].toDouble + case (a,"Array[Int]") => a.asInstanceOf[Array[Int]](0).toDouble + case (a,"Array[Double]") => a.asInstanceOf[Array[Double]](0) + case (a,"Array[Boolean]") => if ( a.asInstanceOf[Array[Boolean]](0) ) 1.0 else 0.0 + case (a,"Array[String]") => a.asInstanceOf[Array[String]](0).toDouble + case (_,tp) => throw new RException(s"Unable to cast ${tp} to Double") + } + + def getB0(identifier: String): Boolean = get(identifier) match { + case (a,"Int") => a.asInstanceOf[Int] != 0 + case (a,"Boolean") => a.asInstanceOf[Boolean] + case (a,"String") => a.asInstanceOf[String].toLowerCase != "false" + case (a,"Array[Int]") => a.asInstanceOf[Array[Int]](0) != 0 + case (a,"Array[Boolean]") => a.asInstanceOf[Array[Boolean]](0) + case (a,"Array[String]") => a.asInstanceOf[Array[String]](0).toLowerCase != "false" + case (_,tp) => throw new RException(s"Unable to cast ${tp} to Boolean") + } + + def getS0(identifier: String): String = get(identifier) match { + case (a,"Int") => a.asInstanceOf[Int].toString + case (a,"Boolean") => a.asInstanceOf[Boolean].toString + case (a,"String") => a.asInstanceOf[String] + case (a,"Array[Int]") => a.asInstanceOf[Array[Int]](0).toString + case (a,"Array[Boolean]") => a.asInstanceOf[Array[Boolean]](0).toString + case (a,"Array[String]") => a.asInstanceOf[Array[String]](0) + case (_,tp) => throw new RException(s"Unable to cast ${tp} to String") + } + + def getI1(identifier: String): Array[Int] = get(identifier) match { + case (a,"Int") => Array(a.asInstanceOf[Int]) + case (a,"Boolean") => Array(if (a.asInstanceOf[Boolean]) 1 else 0) + case (a,"String") => Array(a.asInstanceOf[String].toInt) + case (a,"Array[Int]") => a.asInstanceOf[Array[Int]] + case (a,"Array[Boolean]") => a.asInstanceOf[Array[Boolean]].map(x => if (x) 1 else 0) + case (a,"Array[String]") => a.asInstanceOf[Array[String]].map(_.toInt) + case (_,tp) => throw new RException(s"Unable to cast ${tp} to Array[Int]") + } + + def getB1(identifier: String): Array[Boolean] = get(identifier) match { + case (a,"Int") => Array(a.asInstanceOf[Int] != 0) + case (a,"Boolean") => Array(a.asInstanceOf[Boolean]) + case (a,"String") => Array(a.asInstanceOf[String].toLowerCase != "false") + case (a,"Array[Int]") => a.asInstanceOf[Array[Int]].map(_ != 0) + case (a,"Array[Boolean]") => a.asInstanceOf[Array[Boolean]] + case (a,"Array[String]") => a.asInstanceOf[Array[String]].map(_.toLowerCase != "false") + case (_,tp) => throw new RException(s"Unable to cast ${tp} to Array[Boolean]") + } + + def getS1(identifier: String): Array[String] = get(identifier) match { + case (a,"Int") => Array(a.asInstanceOf[Int].toString) + case (a,"Boolean") => Array(a.asInstanceOf[Boolean].toString) + case (a,"String") => Array(a.asInstanceOf[String]) + case (a,"Array[Int]") => a.asInstanceOf[Array[Int]].map(_.toString) + case (a,"Array[Boolean]") => a.asInstanceOf[Array[Boolean]].map(_.toString) + case (a,"Array[String]") => a.asInstanceOf[Array[String]] + case (_,tp) => throw new RException(s"Unable to cast ${tp} to Array[String]") + } + + def getR(identifier: String): RObjectRef = get(identifier,true) match { + case (a,"RObject") => a.asInstanceOf[RObjectRef] + case (_,tp) => throw new RException(s"Unable to cast ${tp} to RObject") + } + + def gc(): Unit = { + logger.debug("Sending GC request.") + out.writeInt(RClient.Protocol.GC) + out.flush() + } + + + +} + +object RClient { + + object Protocol { + + // Data Types + val UNSUPPORTED_TYPE = 0 + val INTEGER = 1 + val DOUBLE = 2 + val BOOLEAN = 3 + val STRING = 4 + val DATE = 5 + val DATETIME = 6 + + // Data Structures + val UNSUPPORTED_STRUCTURE = 10 + val NULLTYPE = 11 + val REFERENCE = 12 + val ATOMIC = 13 + val VECTOR = 14 + val MATRIX = 15 + val LIST = 16 + val DATAFRAME = 17 + val S3CLASS = 18 + val S4CLASS = 19 + val JOBJ = 20 + + // Commands + val EXIT = 100 + val RESET = 101 + val GC = 102 + val DEBUG = 103 + val EVAL = 104 + val SET = 105 + val SET_SINGLE = 106 + val SET_DOUBLE = 107 + val GET = 108 + val GET_REFERENCE = 109 + val DEF = 110 + val INVOKE = 111 + val SCALAP = 112 + + // Result + val OK = 1000 + val ERROR = 1001 + val UNDEFINED_IDENTIFIER = 1002 + + // Misc. + val CURRENT_SUPPORTED_SCALA_VERSION = "2.10" + + } + + def writeString(out: DataOutputStream, string: String): Unit = { + val bytes = string.getBytes("UTF-8") + val length = bytes.length + out.writeInt(length) + out.write(bytes,0,length) + } + + def readString(in: DataInputStream): String = { + val length = in.readInt() + val bytes = new Array[Byte](length) + in.readFully(bytes) + new String(bytes,"UTF-8") + } + + def isMatrix[T](x: Array[Array[T]]): Boolean = { + if ( x.length != 0 ) { + val len = x(0).length + for ( i <- 1 until x.length ) { + if ( x(i).length != len ) return false + } + } + true + } + + import scala.sys.process._ + private val logger: Logger = LoggerFactory.getLogger(getClass) + val OS = sys.props("os.name").toLowerCase match { + case s if s.startsWith("""windows""") => "windows" + case s if s.startsWith("""linux""") => "linux" + case s if s.startsWith("""unix""") => "linux" + case s if s.startsWith("""mac""") => "macintosh" + case _ => throw new RException("Unrecognized OS") + } + + val defaultArguments = OS match { + case "windows" => Array[String]("--vanilla","--silent","--slave","--ess") + case "linux" => Array[String]("--vanilla","--silent","--slave","--interactive") + case "unix" => Array[String]("--vanilla","--silent","--slave","--interactive") + case "macintosh" => Array[String]("--vanilla","--silent","--slave","--interactive") + } + + lazy val defaultRCmd = OS match { + case "windows" => findROnWindows + case "linux" => """R""" + case "unix" => """R""" + case "macintosh" => """R""" + } + + def findROnWindows: String = { + val NEWLINE = sys.props("line.separator") + var result : String = null + for ( root <- List("HKEY_LOCAL_MACHINE","HKEY_CURRENT_USER") ) { + val out = new StringBuilder() + val logger = ProcessLogger((o: String) => { out.append(o); out.append(NEWLINE) },(e: String) => {}) + try { + ("reg query \"" + root + "\\Software\\R-core\\R\" /v \"InstallPath\"") ! logger + val a = out.toString.split(NEWLINE).filter(_.matches("""^\s*InstallPath\s*.*"""))(0) + result = a.split("REG_SZ")(1).trim() + """\bin\R.exe""" + } catch { + case _ : Throwable => + } + } + if ( result == null ) throw new RException("Cannot locate R using Windows registry.") + else return result + } + + def reader(label: String)(input: InputStream) = { + val in = new BufferedReader(new InputStreamReader(input)) + var line = in.readLine() + while ( line != null ) { + logger.debug(label+line) + line = in.readLine() + } + in.close() + } + + class ScalaSockets(portsFilename: String) { + private val logger: Logger = LoggerFactory.getLogger(getClass) + + val serverIn = new ServerSocket(0,0,InetAddress.getByName(null)) + val serverOut = new ServerSocket(0,0,InetAddress.getByName(null)) + + locally { + logger.info("Trying to open ports filename: "+portsFilename) + val portNumberFile = new File(portsFilename) + val p = new PrintWriter(portNumberFile) + p.println(serverIn.getLocalPort+" "+serverOut.getLocalPort) + p.close() + logger.info("Servers are running on port "+serverIn.getLocalPort+" "+serverOut.getLocalPort) + } + + val socketIn = serverIn.accept + logger.info("serverinaccept done") + val in = new DataInputStream(new BufferedInputStream(socketIn.getInputStream)) + logger.info("in has been created") + val socketOut = serverOut.accept + logger.info("serverouacceptdone") + val out = new DataOutputStream(new BufferedOutputStream(socketOut.getOutputStream)) + logger.info("out is done") + } + + def makeSockets(portsFilename : String) = new ScalaSockets(portsFilename) + + def apply(): RClient = apply(defaultRCmd) + + def apply(rCmd: String, libdir : String = "",debug: Boolean = false, timeout: Int = 60): RClient = { + logger.debug("Creating processIO") + var cmd: PrintWriter = null + val command = rCmd +: defaultArguments + val processCmd = Process(command) + + val processIO = new ProcessIO( + o => { cmd = new PrintWriter(o) }, + reader("STDOUT DEBUG: "), + reader("STDERR DEBUG: "), + true + ) + val portsFile = File.createTempFile("rscala-","") + val processInstance = processCmd.run(processIO) + val snippet = s""" +rscala:::rServe(rscala:::newSockets('${portsFile.getAbsolutePath.replaceAll(File.separator,"/")}',debug=${if ( debug ) "TRUE" else "FALSE"},timeout=${timeout})) +q(save='no') + """ + while ( cmd == null ) Thread.sleep(100) + logger.info("sending snippet " + snippet) + cmd.println(snippet) + cmd.flush() + val sockets = makeSockets(portsFile.getAbsolutePath) + sockets.out.writeInt(Protocol.OK) + sockets.out.flush() + try { + assert( readString(sockets.in) == org.apache.zeppelin.rinterpreter.rscala.Version ) + } catch { + case _: Throwable => throw new RException("The scala and R versions of the package don't match") + } + apply(sockets.in,sockets.out) + } + + /** __For rscala developers only__: Returns an instance of the [[RClient]] class. */ + def apply(in: DataInputStream, out: DataOutputStream): RClient = new RClient(in,out) + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/d5e87fb8/r/src/main/scala/org/apache/zeppelin/rinterpreter/rscala/RException.scala ---------------------------------------------------------------------- diff --git a/r/src/main/scala/org/apache/zeppelin/rinterpreter/rscala/RException.scala b/r/src/main/scala/org/apache/zeppelin/rinterpreter/rscala/RException.scala new file mode 100644 index 0000000..43d129d --- /dev/null +++ b/r/src/main/scala/org/apache/zeppelin/rinterpreter/rscala/RException.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.rinterpreter.rscala + +import org.apache.zeppelin.interpreter.InterpreterResult + +class RException(val snippet : String, val error : String, val message : String = "") extends Exception { + + def this(snippet : String) = this(snippet, "") + + def getInterpreterResult : InterpreterResult = new + InterpreterResult(InterpreterResult.Code.ERROR, message + "\n" + snippet + "\n" + error) + + def getInterpreterResult(st : String) : InterpreterResult = new + InterpreterResult(InterpreterResult.Code.ERROR, message + "\n" + st + "\n" + error) +}
