Repository: incubator-zeppelin Updated Branches: refs/heads/master 45ce8a288 -> 8fdaaba94
Add a (local mode) Scalding Interpreter to Zeppelin ### What is this PR for? Scalding (https://github.com/twitter/scalding) is a Scala library for writing MapReduce jobs. This issue tracks the addition of a Scalding interpreter for Zeppelin. To keep this work incremental, this PR will focus on just a local mode implementation. The Hadoop mode can be a subsequent addition. ### What type of PR is it? Feature ### Todos * Addition of Hadoop mode for Scalding ### Is there a relevant Jira issue? https://issues.apache.org/jira/browse/ZEPPELIN-526 ### How should this be tested? Run the tests in: scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java ### Screenshots <img width="1167" alt="scalding-example" src="https://cloud.githubusercontent.com/assets/1509691/11944979/8788d5c2-a7ff-11e5-9863-2a1216c51896.png"> <img width="1151" alt="scalding-screenshot" src="https://cloud.githubusercontent.com/assets/1509691/11944978/8787e3ec-a7ff-11e5-8383-456adb16b977.png"> ### Questions: * This could use documentation, which could just be the example in the screenshot. Where can I contribute that? Author: Sriram Krishnan <[email protected]> Closes #561 from sriramkrishnan/scalding and squashes the following commits: ffa698b [Sriram Krishnan] Whitespace cleanup 1ad405b [Sriram Krishnan] Adding newline to remove redundant change in PR 8eec3c2 [Sriram Krishnan] Updating docs to include the -Pscalding profile for Scalding 006500d [Sriram Krishnan] Reverting all commits to LICENSE to be back to master bc31d1e [Sriram Krishnan] Getting rid of added licenses b30725f [Sriram Krishnan] Making the Scalding interpreter optional as part of a new -Pscalding profile 9a7d733 [Sriram Krishnan] Moved tukanni license to a separate section and added license dd0bb9a [Sriram Krishnan] Changing licenses to text format aaae5d1 [Sriram Krishnan] Moving licenses to the right location 8be8d22 [Sriram Krishnan] Went thru and added all licenses I could find 6019cc8 [Sriram Krishnan] More licenses. Only remaining ones are the dependencies of hadoop-common. 460658a [Sriram Krishnan] Adding Cascading dependencies dd8a4c8 [Sriram Krishnan] Adding Scalding licenses 083f059 [Sriram Krishnan] Trimming deps down from hadoop-client to just hadoop-common. Scalding REPL needs org.apache.hadoop.conf.Configuration even in local mode. 5c8056c [Sriram Krishnan] Making the Scalding scala jars same as the Spark ones for consistency d4cf308 [Sriram Krishnan] Adding docs for the Scalding interpreter 8004b39 [Sriram Krishnan] Fixing a typo 91b0692 [Sriram Krishnan] adding ScaldingInterpreter 5fd1ae4 [Sriram Krishnan] Address comments on PR. Merge remote-tracking branch 'upstream/master' into scalding 7a9ceeb [Sriram Krishnan] Adding some tests for the Scalding interpreter 7ec2941 [Sriram Krishnan] More code cleanup 368dc04 [Sriram Krishnan] Cleaning up imports, comments, etc c27ec48 [Sriram Krishnan] Formatting, license 8944b0c [Sriram Krishnan] Merge remote-tracking branch 'upstream/master' into scalding 36a2dac [Sriram Krishnan] Added a link to the scalding code where the ILoop was lifted from. d3916b7 [Sriram Krishnan] Adding modified version of ScaldingILoop for grabbing Console output - will want to move this to Scalding itself 1ffbb3b [Sriram Krishnan] Fixing output of stdout from console b19fda4 [Sriram Krishnan] More cleanup - flushing output stream. Still can't seem to get the Scala console output. Need to figure that out. e13576f [Sriram Krishnan] Now seem to be getting the console out, but only for last line. Will need some debugging. 35fc032 [Sriram Krishnan] Initial version of a ScaldingInterpreter running in local mode. Need to get console output next. And add tests, and make it work for HDFS. 721dcb7 [Sriram Krishnan] Getting a basic interpreter going. Next step is to hook in the Scalding REPL. Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/8fdaaba9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/8fdaaba9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/8fdaaba9 Branch: refs/heads/master Commit: 8fdaaba94584ec5c9b8b5d8acb96016700720af1 Parents: 45ce8a2 Author: Sriram Krishnan <[email protected]> Authored: Wed Dec 30 22:25:49 2015 -0800 Committer: Lee moon soo <[email protected]> Committed: Fri Jan 1 19:08:07 2016 -0800 ---------------------------------------------------------------------- README.md | 5 + conf/zeppelin-site.xml.template | 2 +- docs/_includes/themes/zeppelin/_navigation.html | 1 + .../docs-img/scalding-InterpreterBinding.png | Bin 0 -> 13029 bytes .../docs-img/scalding-InterpreterSelection.png | Bin 0 -> 24926 bytes .../zeppelin/img/docs-img/scalding-pie.png | Bin 0 -> 98697 bytes docs/docs.md | 1 + docs/interpreter/scalding.md | 78 +++++ pom.xml | 7 + scalding/pom.xml | 202 +++++++++++++ .../zeppelin/scalding/ScaldingInterpreter.java | 288 +++++++++++++++++++ .../zeppelin/scalding/ScaldingILoop.scala | 111 +++++++ .../scalding/ScaldingInterpreterTest.java | 130 +++++++++ .../zeppelin/conf/ZeppelinConfiguration.java | 3 +- 14 files changed, 826 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8fdaaba9/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index 85fc0b6..9a99b78 100644 --- a/README.md +++ b/README.md @@ -153,6 +153,11 @@ mvn clean package -Pspark-1.5 -Pmapr50 -DskipTests mvn clean package -Dignite.version=1.1.0-incubating -DskipTests ``` +#### Scalding Interpreter + +``` +mvn clean package -Pscalding -DskipTests +``` ### Configure If you wish to configure Zeppelin option (like port number), configure the following files: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8fdaaba9/conf/zeppelin-site.xml.template ---------------------------------------------------------------------- diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template index b6aca75..74fa2e7 100755 --- a/conf/zeppelin-site.xml.template +++ b/conf/zeppelin-site.xml.template @@ -105,7 +105,7 @@ <property> <name>zeppelin.interpreters</name> - <value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.phoenix.PhoenixInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter</value> + <value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.phoenix.PhoenixInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter</value> <description>Comma separated interpreter configurations. First interpreter become a default</description> </property> http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8fdaaba9/docs/_includes/themes/zeppelin/_navigation.html ---------------------------------------------------------------------- diff --git a/docs/_includes/themes/zeppelin/_navigation.html b/docs/_includes/themes/zeppelin/_navigation.html index 2c62282..5b2da51 100644 --- a/docs/_includes/themes/zeppelin/_navigation.html +++ b/docs/_includes/themes/zeppelin/_navigation.html @@ -45,6 +45,7 @@ <li><a href="{{BASE_PATH}}/interpreter/lens.html">Lens</a></li> <li><a href="{{BASE_PATH}}/interpreter/markdown.html">Markdown</a></li> <li><a href="{{BASE_PATH}}/interpreter/postgresql.html">Postgresql, hawq</a></li> + <li><a href="{{BASE_PATH}}/interpreter/scalding.html">Scalding</a></li> <li><a href="{{BASE_PATH}}/pleasecontribute.html">Shell</a></li> <li><a href="{{BASE_PATH}}/interpreter/spark.html">Spark</a></li> <li><a href="{{BASE_PATH}}/pleasecontribute.html">Tajo</a></li> http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8fdaaba9/docs/assets/themes/zeppelin/img/docs-img/scalding-InterpreterBinding.png ---------------------------------------------------------------------- diff --git a/docs/assets/themes/zeppelin/img/docs-img/scalding-InterpreterBinding.png b/docs/assets/themes/zeppelin/img/docs-img/scalding-InterpreterBinding.png new file mode 100644 index 0000000..1131310 Binary files /dev/null and b/docs/assets/themes/zeppelin/img/docs-img/scalding-InterpreterBinding.png differ http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8fdaaba9/docs/assets/themes/zeppelin/img/docs-img/scalding-InterpreterSelection.png ---------------------------------------------------------------------- diff --git a/docs/assets/themes/zeppelin/img/docs-img/scalding-InterpreterSelection.png b/docs/assets/themes/zeppelin/img/docs-img/scalding-InterpreterSelection.png new file mode 100644 index 0000000..c52f4e3 Binary files /dev/null and b/docs/assets/themes/zeppelin/img/docs-img/scalding-InterpreterSelection.png differ http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8fdaaba9/docs/assets/themes/zeppelin/img/docs-img/scalding-pie.png ---------------------------------------------------------------------- diff --git a/docs/assets/themes/zeppelin/img/docs-img/scalding-pie.png b/docs/assets/themes/zeppelin/img/docs-img/scalding-pie.png new file mode 100644 index 0000000..bb01025 Binary files /dev/null and b/docs/assets/themes/zeppelin/img/docs-img/scalding-pie.png differ http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8fdaaba9/docs/docs.md ---------------------------------------------------------------------- diff --git a/docs/docs.md b/docs/docs.md index a2347a8..b70ee58 100644 --- a/docs/docs.md +++ b/docs/docs.md @@ -41,6 +41,7 @@ limitations under the License. * [lens](./interpreter/lens.html) * [md](./interpreter/markdown.html) * [postgresql, hawq](./interpreter/postgresql.html) +* [scalding](./interpreter/scalding.html) * [sh](./pleasecontribute.html) * [spark](./interpreter/spark.html) * [tajo](./pleasecontribute.html) http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8fdaaba9/docs/interpreter/scalding.md ---------------------------------------------------------------------- diff --git a/docs/interpreter/scalding.md b/docs/interpreter/scalding.md new file mode 100644 index 0000000..40ec8b1 --- /dev/null +++ b/docs/interpreter/scalding.md @@ -0,0 +1,78 @@ +--- +layout: page +title: "Scalding Interpreter" +description: "" +group: manual +--- +{% include JB/setup %} + + +## Scalding Interpreter for Apache Zeppelin +[Scalding](https://github.com/twitter/scalding) is an open source Scala library for writing MapReduce jobs. + +### Building the Scalding Interpreter +You have to first build the Scalding interpreter by enable the **scalding** profile as follows: + +``` +mvn clean package -Pscalding -DskipTests +``` + +### Enabling the Scalding Interpreter + +In a notebook, to enable the **Scalding** interpreter, click on the **Gear** icon,select **Scalding**, and hit **Save**. + + <center> +  + +  + </center> + +### Configuring the Interpreter +Zeppelin comes with a pre-configured Scalding interpreter in local mode, so you do not need to install anything. + +### Testing the Interpreter + +In example, by using the [Alice in Wonderland](https://gist.github.com/johnynek/a47699caa62f4f38a3e2) tutorial, we will count words (of course!), and plot a graph of the top 10 words in the book. + +``` +%scalding + +import scala.io.Source + +// Get the Alice in Wonderland book from gutenberg.org: +val alice = Source.fromURL("http://www.gutenberg.org/files/11/11.txt").getLines +val aliceLineNum = alice.zipWithIndex.toList +val alicePipe = TypedPipe.from(aliceLineNum) + +// Now get a list of words for the book: +val aliceWords = alicePipe.flatMap { case (text, _) => text.split("\\s+").toList } + +// Now lets add a count for each word: +val aliceWithCount = aliceWords.filterNot(_.equals("")).map { word => (word, 1L) } + +// let's sum them for each word: +val wordCount = aliceWithCount.group.sum + +print ("Here are the top 10 words\n") +val top10 = wordCount + .groupAll + .sortBy { case (word, count) => -count } + .take(10) +top10.dump + +``` +``` +%scalding + +val table = "words\t count\n" + top10.toIterator.map{case (k, (word, count)) => s"$word\t$count"}.mkString("\n") +print("%table " + table) + +``` + +If you click on the icon for the pie chart, you should be able to see a chart like this: + + +### Current Status & Future Work +The current implementation of the Scalding interpreter does not support canceling jobs, or fine-grained progress updates. + +The pre-configured Scalding interpreter only supports Scalding in local mode. Hadoop mode for Scalding is currently unsupported, and will be future work (contributions welcome!). \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8fdaaba9/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 5e492fa..88d38aa 100755 --- a/pom.xml +++ b/pom.xml @@ -628,6 +628,13 @@ </profile> <profile> + <id>scalding</id> + <modules> + <module>scalding</module> + </modules> + </profile> + + <profile> <id>build-distr</id> <activation> <activeByDefault>false</activeByDefault> http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8fdaaba9/scalding/pom.xml ---------------------------------------------------------------------- diff --git a/scalding/pom.xml b/scalding/pom.xml new file mode 100644 index 0000000..abc1e2b --- /dev/null +++ b/scalding/pom.xml @@ -0,0 +1,202 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>zeppelin</artifactId> + <groupId>org.apache.zeppelin</groupId> + <version>0.6.0-incubating-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <groupId>org.apache.zeppelin</groupId> + <artifactId>zeppelin-scalding</artifactId> + <packaging>jar</packaging> + <version>0.6.0-incubating-SNAPSHOT</version> + <name>Zeppelin: Scalding interpreter</name> + <url>http://zeppelin.incubator.apache.org</url> + + <properties> + <scala.version>2.10.4</scala.version> + <hadoop.version>2.3.0</hadoop.version> + <scalding.version>0.15.1-RC13</scalding.version> + </properties> + + <repositories> + <repository> + <id>conjars</id> + <name>Concurrent Maven Repo</name> + <url>http://conjars.org/repo</url> + </repository> + </repositories> + + <dependencies> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>zeppelin-interpreter</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-exec</artifactId> + <version>1.3</version> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>com.twitter</groupId> + <artifactId>scalding-core_2.10</artifactId> + <version>${scalding.version}</version> + </dependency> + + <dependency> + <groupId>com.twitter</groupId> + <artifactId>scalding-repl_2.10</artifactId> + <version>${scalding.version}</version> + </dependency> + + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + <version>${scala.version}</version> + </dependency> + + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-compiler</artifactId> + <version>${scala.version}</version> + </dependency> + + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-reflect</artifactId> + <version>${scala.version}</version> + </dependency> + + <!-- Scalding REPL needs org.apache.hadoop.conf.Configuration even in local mode --> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-deploy-plugin</artifactId> + <version>2.7</version> + <configuration> + <skip>true</skip> + </configuration> + </plugin> + + <plugin> + <artifactId>maven-enforcer-plugin</artifactId> + <version>1.3.1</version> + <executions> + <execution> + <id>enforce</id> + <phase>none</phase> + </execution> + </executions> + </plugin> + + <plugin> + <artifactId>maven-dependency-plugin</artifactId> + <version>2.8</version> + <executions> + <execution> + <id>copy-dependencies</id> + <phase>package</phase> + <goals> + <goal>copy-dependencies</goal> + </goals> + <configuration> + <outputDirectory>${project.build.directory}/../../interpreter/scalding</outputDirectory> + <overWriteReleases>false</overWriteReleases> + <overWriteSnapshots>false</overWriteSnapshots> + <overWriteIfNewer>true</overWriteIfNewer> + <includeScope>runtime</includeScope> + </configuration> + </execution> + <execution> + <id>copy-artifact</id> + <phase>package</phase> + <goals> + <goal>copy</goal> + </goals> + <configuration> + <outputDirectory>${project.build.directory}/../../interpreter/scalding</outputDirectory> + <overWriteReleases>false</overWriteReleases> + <overWriteSnapshots>false</overWriteSnapshots> + <overWriteIfNewer>true</overWriteIfNewer> + <includeScope>runtime</includeScope> + <artifactItems> + <artifactItem> + <groupId>${project.groupId}</groupId> + <artifactId>${project.artifactId}</artifactId> + <version>${project.version}</version> + <type>${project.packaging}</type> + </artifactItem> + </artifactItems> + </configuration> + </execution> + </executions> + </plugin> + <!-- Plugin to compile Scala code --> + <plugin> + <groupId>org.scala-tools</groupId> + <artifactId>maven-scala-plugin</artifactId> + <executions> + <execution> + <id>compile</id> + <goals> + <goal>compile</goal> + </goals> + <phase>compile</phase> + </execution> + <execution> + <id>test-compile</id> + <goals> + <goal>testCompile</goal> + </goals> + <phase>test-compile</phase> + </execution> + <execution> + <phase>process-resources</phase> + <goals> + <goal>compile</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8fdaaba9/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java ---------------------------------------------------------------------- diff --git a/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java b/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java new file mode 100644 index 0000000..d43417e --- /dev/null +++ b/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.scalding; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.PrintStream; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.net.URL; +import java.net.URLClassLoader; + +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.apache.zeppelin.scheduler.Scheduler; +import org.apache.zeppelin.scheduler.SchedulerFactory; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.Console; +import scala.Some; +import scala.None; +import scala.tools.nsc.Settings; +import scala.tools.nsc.settings.MutableSettings.BooleanSetting; +import scala.tools.nsc.settings.MutableSettings.PathSetting; + +/** + * Scalding interpreter for Zeppelin. Based off the Spark interpreter code. + * + */ +public class ScaldingInterpreter extends Interpreter { + Logger logger = LoggerFactory.getLogger(ScaldingInterpreter.class); + + public static final List<String> NO_COMPLETION = + Collections.unmodifiableList(new ArrayList<String>()); + + static { + Interpreter.register("scalding", ScaldingInterpreter.class.getName()); + } + + private ScaldingILoop interpreter; + private ByteArrayOutputStream out; + private Map<String, Object> binder; + + public ScaldingInterpreter(Properties property) { + super(property); + out = new ByteArrayOutputStream(); + } + + @Override + public void open() { + URL[] urls = getClassloaderUrls(); + + // Very nice discussion about how scala compiler handle classpath + // https://groups.google.com/forum/#!topic/scala-user/MlVwo2xCCI0 + + /* + * > val env = new nsc.Settings(errLogger) > env.usejavacp.value = true > val p = new + * Interpreter(env) > p.setContextClassLoader > Alternatively you can set the class path through + * nsc.Settings.classpath. + * + * >> val settings = new Settings() >> settings.usejavacp.value = true >> + * settings.classpath.value += File.pathSeparator + >> System.getProperty("java.class.path") >> + * val in = new Interpreter(settings) { >> override protected def parentClassLoader = + * getClass.getClassLoader >> } >> in.setContextClassLoader() + */ + Settings settings = new Settings(); + + // set classpath for scala compiler + PathSetting pathSettings = settings.classpath(); + String classpath = ""; + List<File> paths = currentClassPath(); + for (File f : paths) { + if (classpath.length() > 0) { + classpath += File.pathSeparator; + } + classpath += f.getAbsolutePath(); + } + + if (urls != null) { + for (URL u : urls) { + if (classpath.length() > 0) { + classpath += File.pathSeparator; + } + classpath += u.getFile(); + } + } + + pathSettings.v_$eq(classpath); + settings.scala$tools$nsc$settings$ScalaSettings$_setter_$classpath_$eq(pathSettings); + + + // set classloader for scala compiler + settings.explicitParentLoader_$eq(new Some<ClassLoader>(Thread.currentThread() + .getContextClassLoader())); + BooleanSetting b = (BooleanSetting) settings.usejavacp(); + b.v_$eq(true); + settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b); + + /* Scalding interpreter */ + PrintStream printStream = new PrintStream(out); + interpreter = new ScaldingILoop(null, new PrintWriter(out)); + interpreter.settings_$eq(settings); + interpreter.createInterpreter(); + + interpreter.intp(). + interpret("@transient var _binder = new java.util.HashMap[String, Object]()"); + binder = (Map<String, Object>) getValue("_binder"); + binder.put("out", printStream); + } + + private Object getValue(String name) { + Object ret = interpreter.intp().valueOfTerm(name); + if (ret instanceof None) { + return null; + } else if (ret instanceof Some) { + return ((Some) ret).get(); + } else { + return ret; + } + } + + private List<File> currentClassPath() { + List<File> paths = classPath(Thread.currentThread().getContextClassLoader()); + String[] cps = System.getProperty("java.class.path").split(File.pathSeparator); + if (cps != null) { + for (String cp : cps) { + paths.add(new File(cp)); + } + } + return paths; + } + + private List<File> classPath(ClassLoader cl) { + List<File> paths = new LinkedList<File>(); + if (cl == null) { + return paths; + } + + if (cl instanceof URLClassLoader) { + URLClassLoader ucl = (URLClassLoader) cl; + URL[] urls = ucl.getURLs(); + if (urls != null) { + for (URL url : urls) { + paths.add(new File(url.getFile())); + } + } + } + return paths; + } + + @Override + public void close() { + interpreter.intp().close(); + } + + + @Override + public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) { + logger.info("Running Scalding command '" + cmd + "'"); + + if (cmd == null || cmd.trim().length() == 0) { + return new InterpreterResult(Code.SUCCESS); + } + return interpret(cmd.split("\n"), contextInterpreter); + } + + public InterpreterResult interpret(String[] lines, InterpreterContext context) { + synchronized (this) { + InterpreterResult r = interpretInput(lines); + return r; + } + } + + public InterpreterResult interpretInput(String[] lines) { + + // add print("") to make sure not finishing with comment + // see https://github.com/NFLabs/zeppelin/issues/151 + String[] linesToRun = new String[lines.length + 1]; + for (int i = 0; i < lines.length; i++) { + linesToRun[i] = lines[i]; + } + linesToRun[lines.length] = "print(\"\")"; + + Console.setOut((java.io.PrintStream) binder.get("out")); + out.reset(); + Code r = null; + String incomplete = ""; + + for (int l = 0; l < linesToRun.length; l++) { + String s = linesToRun[l]; + // check if next line starts with "." (but not ".." or "./") it is treated as an invocation + if (l + 1 < linesToRun.length) { + String nextLine = linesToRun[l + 1].trim(); + if (nextLine.startsWith(".") && !nextLine.startsWith("..") && !nextLine.startsWith("./")) { + incomplete += s + "\n"; + continue; + } + } + + scala.tools.nsc.interpreter.Results.Result res = null; + try { + res = interpreter.intp().interpret(incomplete + s); + } catch (Exception e) { + logger.error("Interpreter exception: ", e); + return new InterpreterResult(Code.ERROR, e.getMessage()); + } + + r = getResultCode(res); + + if (r == Code.ERROR) { + Console.flush(); + return new InterpreterResult(r, out.toString()); + } else if (r == Code.INCOMPLETE) { + incomplete += s + "\n"; + } else { + incomplete = ""; + } + } + + if (r == Code.INCOMPLETE) { + return new InterpreterResult(r, "Incomplete expression"); + } else { + Console.flush(); + return new InterpreterResult(r, out.toString()); + } + } + + private Code getResultCode(scala.tools.nsc.interpreter.Results.Result r) { + if (r instanceof scala.tools.nsc.interpreter.Results.Success$) { + return Code.SUCCESS; + } else if (r instanceof scala.tools.nsc.interpreter.Results.Incomplete$) { + return Code.INCOMPLETE; + } else { + return Code.ERROR; + } + } + + @Override + public void cancel(InterpreterContext context) { + // not implemented + } + + @Override + public FormType getFormType() { + return FormType.NATIVE; + } + + @Override + public int getProgress(InterpreterContext context) { + // fine-grained progress not implemented - return 0 + return 0; + } + + @Override + public Scheduler getScheduler() { + return SchedulerFactory.singleton().createOrGetFIFOScheduler( + ScaldingInterpreter.class.getName() + this.hashCode()); + } + + @Override + public List<String> completion(String buf, int cursor) { + return NO_COMPLETION; + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8fdaaba9/scalding/src/main/scala/org/apache/zeppelin/scalding/ScaldingILoop.scala ---------------------------------------------------------------------- diff --git a/scalding/src/main/scala/org/apache/zeppelin/scalding/ScaldingILoop.scala b/scalding/src/main/scala/org/apache/zeppelin/scalding/ScaldingILoop.scala new file mode 100644 index 0000000..bd23c49 --- /dev/null +++ b/scalding/src/main/scala/org/apache/zeppelin/scalding/ScaldingILoop.scala @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.scalding; + +import java.io.{BufferedReader, File, FileReader} + +import scala.tools.nsc.GenericRunnerSettings +import scala.tools.nsc.interpreter.{ILoop, IR, JPrintWriter} + + +/** + * A class providing Scalding specific commands for inclusion in the Scalding REPL. + * This is currently forked from Scalding, but should eventually make it into Scalding itself: + * https://github.com/twitter/scalding/blob/develop/scalding-repl/src/main/scala/com/twitter/scalding/ScaldingILoop.scala + */ + class ScaldingILoop(in0: Option[BufferedReader], out: JPrintWriter) + extends ILoop(in0, out) { + // def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out) + // def this() = this(None, new JPrintWriter(Console.out, true)) + + settings = new GenericRunnerSettings({ s => echo(s) }) + + override def printWelcome() { + val fc = Console.YELLOW + val wc = Console.RED + def wrapFlames(s: String) = s.replaceAll("[()]+", fc + "$0" + wc) + echo(fc + + " ( \n" + + " )\\ ) ( ( \n" + + "(()/( ) )\\ )\\ ) ( ( ( \n" + + " /(_)) ( ( /( ((_)(()/( )\\ ( )\\))( \n" + + "(_)) )\\ )( )) _ ((_)(( ) )\\ ) (( ))\\ \n".replaceAll("_", wc + "_" + fc) + wc + + wrapFlames("/ __|((_) ((_)_ | | _| | (_) _(_(( (_()_) \n") + + wrapFlames("\\__ \\/ _| / _` || |/ _` | | || ' \\))/ _` \\ \n") + + "|___/\\__| \\__,_||_|\\__,_| |_||_||_| \\__, | \n" + + " |___/ ") + } + + /** + * Commands specific to the Scalding REPL. To define a new command use one of the following + * factory methods: + * - `LoopCommand.nullary` for commands that take no arguments + * - `LoopCommand.cmd` for commands that take one string argument + * - `LoopCommand.varargs` for commands that take multiple string arguments + */ + private val scaldingCommands: List[LoopCommand] = List() + + /** + * Change the shell prompt to read scalding> + * + * @return a prompt string to use for this REPL. + */ + override def prompt: String = Console.BLUE + "\nscalding> " + Console.RESET + + private[this] def addImports(ids: String*): IR.Result = + if (ids.isEmpty) IR.Success + else intp.interpret("import " + ids.mkString(", ")) + + /** + * Search for files with the given name in all directories from current directory + * up to root. + */ + private def findAllUpPath(filename: String): List[File] = + Iterator.iterate(System.getProperty("user.dir"))(new File(_).getParent) + .takeWhile(_ != "/") + .flatMap(new File(_).listFiles.filter(_.toString.endsWith(filename))) + .toList + + /** + * Gets the list of commands that this REPL supports. + * + * @return a list of the command supported by this REPL. + */ + override def commands: List[LoopCommand] = super.commands ++ scaldingCommands + + protected def imports: List[String] = List( + "com.twitter.scalding._", + "com.twitter.scalding.ReplImplicits._", + "com.twitter.scalding.ReplImplicitContext._", + "com.twitter.scalding.ReplState._") + + override def createInterpreter() { + super.createInterpreter() + intp.beQuietDuring { + addImports(imports: _*) + + settings match { + case s: GenericRunnerSettings => + findAllUpPath(".scalding_repl").reverse.foreach { + f => s.loadfiles.appendToValue(f.toString) + } + case _ => () + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8fdaaba9/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java ---------------------------------------------------------------------- diff --git a/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java b/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java new file mode 100644 index 0000000..7a753fa --- /dev/null +++ b/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.scalding; + +import static org.junit.Assert.*; + +import java.io.File; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Properties; + +import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.display.GUI; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterContextRunner; +import org.apache.zeppelin.interpreter.InterpreterGroup; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.junit.After; +import org.junit.Before; +import org.junit.FixMethodOrder; +import org.junit.Test; +import org.junit.runners.MethodSorters; + +/** + * Tests for the Scalding interpreter for Zeppelin. + * + */ +@FixMethodOrder(MethodSorters.NAME_ASCENDING) +public class ScaldingInterpreterTest { + public static ScaldingInterpreter repl; + private InterpreterContext context; + private File tmpDir; + + @Before + public void setUp() throws Exception { + tmpDir = new File(System.getProperty("java.io.tmpdir") + "/ZeppelinLTest_" + System.currentTimeMillis()); + System.setProperty("zeppelin.dep.localrepo", tmpDir.getAbsolutePath() + "/local-repo"); + + tmpDir.mkdirs(); + + if (repl == null) { + Properties p = new Properties(); + + repl = new ScaldingInterpreter(p); + repl.open(); + } + + InterpreterGroup intpGroup = new InterpreterGroup(); + context = new InterpreterContext("note", "id", "title", "text", + new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry( + intpGroup.getId(), null), + new LinkedList<InterpreterContextRunner>()); + } + + @After + public void tearDown() throws Exception { + delete(tmpDir); + repl.close(); + } + + private void delete(File file) { + if (file.isFile()) file.delete(); + else if (file.isDirectory()) { + File[] files = file.listFiles(); + if (files != null && files.length > 0) { + for (File f : files) { + delete(f); + } + } + file.delete(); + } + } + + @Test + public void testBasicIntp() { + assertEquals(InterpreterResult.Code.SUCCESS, + repl.interpret("val a = 1\nval b = 2", context).code()); + + // when interpret incomplete expression + InterpreterResult incomplete = repl.interpret("val a = \"\"\"", context); + assertEquals(InterpreterResult.Code.INCOMPLETE, incomplete.code()); + assertTrue(incomplete.message().length() > 0); // expecting some error + // message + } + + @Test + public void testBasicScalding() { + assertEquals(InterpreterResult.Code.SUCCESS, + repl.interpret("case class Sale(state: String, name: String, sale: Int)\n" + + "val salesList = List(Sale(\"CA\", \"A\", 60), Sale(\"CA\", \"A\", 20), Sale(\"VA\", \"B\", 15))\n" + + "val salesPipe = TypedPipe.from(salesList)\n" + + "val results = salesPipe.map{x => (1, Set(x.state), x.sale)}.\n" + + " groupAll.sum.values.map{ case(count, set, sum) => (count, set.size, sum) }\n" + + "results.dump", + context).code()); + } + + @Test + public void testNextLineInvocation() { + assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("\"123\"\n.toInt", context).code()); + } + + @Test + public void testEndWithComment() { + assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("val c=1\n//comment", context).code()); + } + + @Test + public void testReferencingUndefinedVal() { + InterpreterResult result = repl.interpret("def category(min: Int) = {" + + " if (0 <= value) \"error\"" + "}", context); + assertEquals(Code.ERROR, result.code()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8fdaaba9/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index 87d1c20..ed3b8c0 100755 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -411,7 +411,8 @@ public class ZeppelinConfiguration extends XMLConfiguration { + "org.apache.zeppelin.geode.GeodeOqlInterpreter," + "org.apache.zeppelin.postgresql.PostgreSqlInterpreter," + "org.apache.zeppelin.kylin.KylinInterpreter," - + "org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter"), + + "org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter," + + "org.apache.zeppelin.scalding.ScaldingInterpreter"), ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"), ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT("zeppelin.interpreter.connect.timeout", 30000), ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),
