Repository: incubator-zeppelin Updated Branches: refs/heads/master 43baa0af4 -> 96dbc6656
[ZEPPELIN-840] Scalding interpreter that works in hdfs mode ### What is this PR for? Scalding interpreter that works in hdfs mode ### What type of PR is it? Improvement ### Todos * [x] - Update documentation. ### What is the Jira issue? [ZEPPELIN-840](https://issues.apache.org/jira/browse/ZEPPELIN-840) ### How should this be tested? 1. The remote interpreter has to be run in on a system with Hadoop libraries. 1. Run "%scalding mode" and verify that it is Hdfs 1. Run command that create map-reduce job. For example, "TypedPipe.from(TextLine("/user/pwagle/testfile")).filter(x => x == "a").toList" ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? Yes Author: Prasad Wagle <[email protected]> Closes #917 from prasadwagle/ZEPPELIN-840 and squashes the following commits: e91efd1 [Prasad Wagle] Restore document section on how to build scalding interpreter by enabling scalding profile a001660 [Prasad Wagle] Restore scalding profile f59bb55 [Prasad Wagle] Revert scala version change in zeppelin-server/pom.xml 730d5b6 [Prasad Wagle] Remove scalding profile from pom.xml d74c52c [Prasad Wagle] Remove -Pscalding from .travis.yml PROFILE variable 92bfcca [Prasad Wagle] Fix checkstyle error a29867d [Prasad Wagle] Update scalding interpreter doc 5db8daf [Prasad Wagle] Make variables final to avoid java 1.7 compiler error, go back to java 1.7 44e5702 [Prasad Wagle] Fix checkstyle error d2052ca [Prasad Wagle] Re-add http://maven.twttr.com repository to resolve com.hadoop.gplcompression:hadoop-lzo:jar:0.4.19 dependency 301f126 [Prasad Wagle] Change scalding.version to 0.16.1-RC1 a84ea3e [Prasad Wagle] Use oraclejdk8 in travis.yml 9060073 [Prasad Wagle] Move ZeppelinScaldingShell to org.apache.zeppelin.scalding, use java 1.8, scala 2.11 0e43d00 [Prasad Wagle] Add http://maven.twttr.com repository to resolve com.hadoop.gplcompression:hadoop-lzo:jar:0.4.19 dependency 5908935 [Prasad Wagle] Scalding interpreter that works in hdfs mode Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/96dbc665 Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/96dbc665 Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/96dbc665 Branch: refs/heads/master Commit: 96dbc6656be6df5a19fd01af6b147327702e4392 Parents: 43baa0a Author: Prasad Wagle <[email protected]> Authored: Tue Jun 7 07:58:36 2016 -0700 Committer: Lee moon soo <[email protected]> Committed: Thu Jun 9 09:11:49 2016 -0700 ---------------------------------------------------------------------- docs/interpreter/scalding.md | 86 +++++++- scalding/pom.xml | 49 ++++- .../zeppelin/scalding/ScaldingInterpreter.java | 219 +++++++++---------- .../zeppelin/scalding/ScaldingILoop.scala | 111 ---------- .../zeppelin/scalding/ZeppelinReplState.scala | 48 ++++ .../scalding/ZeppelinScaldingLoop.scala | 46 ++++ .../scalding/ZeppelinScaldingShell.scala | 72 ++++++ .../scalding/ScaldingInterpreterTest.java | 3 +- 8 files changed, 390 insertions(+), 244 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/96dbc665/docs/interpreter/scalding.md ---------------------------------------------------------------------- diff --git a/docs/interpreter/scalding.md b/docs/interpreter/scalding.md index 4430312..ec5608b 100644 --- a/docs/interpreter/scalding.md +++ b/docs/interpreter/scalding.md @@ -28,10 +28,49 @@ In a notebook, to enable the **Scalding** interpreter, click on the **Gear** ico </center> ### Configuring the Interpreter -Zeppelin comes with a pre-configured Scalding interpreter in local mode, so you do not need to install anything. + +Scalding interpreter runs in two modes: + +* local +* hdfs + +In the local mode, you can access files on the local server and scalding transformation are done locally. + +In hdfs mode you can access files in HDFS and scalding transformation are run as hadoop map-reduce jobs. + +Zeppelin comes with a pre-configured Scalding interpreter in local mode. + +To run the scalding interpreter in the hdfs mode you have to do the following: + +**Set the classpath with ZEPPELIN\_CLASSPATH\_OVERRIDES** + +In conf/zeppelin_env.sh, you have to set +ZEPPELIN_CLASSPATH_OVERRIDES to the contents of 'hadoop classpath' +and directories with custom jar files you need for your scalding commands. + +**Set arguments to the scalding repl** + +The default arguments are: "--local --repl" + +For hdfs mode you need to add: "--hdfs --repl" + +If you want to add custom jars, you need to add: +"-libjars directory/*:directory/*" + +For reducer estimation, you need to add something like: +"-Dscalding.reducer.estimator.classes=com.twitter.scalding.reducer_estimation.InputSizeReducerEstimator" + +**Set max.open.instances** + +If you want to control the maximum number of open interpreters, you have to select "scoped" interpreter for note +option and set max.open.instances argument. ### Testing the Interpreter -In example, by using the [Alice in Wonderland](https://gist.github.com/johnynek/a47699caa62f4f38a3e2) tutorial, we will count words (of course!), and plot a graph of the top 10 words in the book. + +#### Local mode + +In example, by using the [Alice in Wonderland](https://gist.github.com/johnynek/a47699caa62f4f38a3e2) tutorial, +we will count words (of course!), and plot a graph of the top 10 words in the book. ``` %scalding @@ -71,7 +110,44 @@ print("%table " + table) If you click on the icon for the pie chart, you should be able to see a chart like this:  -### Current Status & Future Work -The current implementation of the Scalding interpreter does not support canceling jobs, or fine-grained progress updates. -The pre-configured Scalding interpreter only supports Scalding in local mode. Hadoop mode for Scalding is currently unsupported, and will be future work (contributions welcome!). +#### HDFS mode + +**Test mode** + +``` +%scalding +mode +``` +This command should print: + +``` +res4: com.twitter.scalding.Mode = Hdfs(true,Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml) +``` + + +**Test HDFS read** + +``` +val testfile = TypedPipe.from(TextLine("/user/x/testfile")) +testfile.dump +``` + +This command should print the contents of the hdfs file /user/x/testfile. + +**Test map-reduce job** + +``` +val testfile = TypedPipe.from(TextLine("/user/x/testfile")) +val a = testfile.groupAll.size.values +a.toList + +``` + +This command should create a map reduce job. + +### Future Work +* Better user feedback (hadoop url, progress updates) +* Ability to cancel jobs +* Ability to dynamically load jars without restarting the interpreter +* Multiuser scalability (run scalding interpreters on different servers) http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/96dbc665/scalding/pom.xml ---------------------------------------------------------------------- diff --git a/scalding/pom.xml b/scalding/pom.xml index 2b04f66..a3b3b58 100644 --- a/scalding/pom.xml +++ b/scalding/pom.xml @@ -34,9 +34,9 @@ <url>http://zeppelin.apache.org</url> <properties> - <scala.version>2.10.4</scala.version> - <hadoop.version>2.3.0</hadoop.version> - <scalding.version>0.15.1-RC13</scalding.version> + <scala.version>2.11.8</scala.version> + <hadoop.version>2.6.0</hadoop.version> + <scalding.version>0.16.1-RC1</scalding.version> </properties> <repositories> @@ -45,6 +45,11 @@ <name>Concurrent Maven Repo</name> <url>http://conjars.org/repo</url> </repository> + <repository> + <id>twitter</id> + <name>Twitter Maven Repo</name> + <url>http://maven.twttr.com</url> + </repository> </repositories> <dependencies> @@ -69,13 +74,43 @@ <dependency> <groupId>com.twitter</groupId> - <artifactId>scalding-core_2.10</artifactId> + <artifactId>scalding-core_2.11</artifactId> + <version>${scalding.version}</version> + </dependency> + + <dependency> + <groupId>com.twitter</groupId> + <artifactId>scalding-args_2.11</artifactId> + <version>${scalding.version}</version> + </dependency> + + <dependency> + <groupId>com.twitter</groupId> + <artifactId>scalding-date_2.11</artifactId> + <version>${scalding.version}</version> + </dependency> + + <dependency> + <groupId>com.twitter</groupId> + <artifactId>scalding-commons_2.11</artifactId> + <version>${scalding.version}</version> + </dependency> + + <dependency> + <groupId>com.twitter</groupId> + <artifactId>scalding-avro_2.11</artifactId> <version>${scalding.version}</version> </dependency> <dependency> <groupId>com.twitter</groupId> - <artifactId>scalding-repl_2.10</artifactId> + <artifactId>scalding-parquet_2.11</artifactId> + <version>${scalding.version}</version> + </dependency> + + <dependency> + <groupId>com.twitter</groupId> + <artifactId>scalding-repl_2.11</artifactId> <version>${scalding.version}</version> </dependency> @@ -97,12 +132,12 @@ <version>${scala.version}</version> </dependency> - <!-- Scalding REPL needs org.apache.hadoop.conf.Configuration even in local mode --> <dependency> <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> + <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> + </dependencies> <build> http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/96dbc665/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java ---------------------------------------------------------------------- diff --git a/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java b/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java index e808e70..4542297 100644 --- a/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java +++ b/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java @@ -17,35 +17,29 @@ package org.apache.zeppelin.scalding; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.PrintStream; -import java.io.PrintWriter; -import java.util.ArrayList; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.net.URL; -import java.net.URLClassLoader; - +import com.twitter.scalding.ScaldingILoop; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.apache.zeppelin.scheduler.Scheduler; import org.apache.zeppelin.scheduler.SchedulerFactory; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import scala.Console; -import scala.Some; -import scala.None; -import scala.tools.nsc.Settings; -import scala.tools.nsc.settings.MutableSettings.BooleanSetting; -import scala.tools.nsc.settings.MutableSettings.PathSetting; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.io.PrintWriter; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Properties; /** * Scalding interpreter for Zeppelin. Based off the Spark interpreter code. @@ -54,16 +48,29 @@ import scala.tools.nsc.settings.MutableSettings.PathSetting; public class ScaldingInterpreter extends Interpreter { Logger logger = LoggerFactory.getLogger(ScaldingInterpreter.class); + static final String ARGS_STRING = "args.string"; + static final String ARGS_STRING_DEFAULT = "--local --repl"; + static final String MAX_OPEN_INSTANCES = "max.open.instances"; + static final String MAX_OPEN_INSTANCES_DEFAULT = "50"; + public static final List<String> NO_COMPLETION = Collections.unmodifiableList(new ArrayList<String>()); static { - Interpreter.register("scalding", ScaldingInterpreter.class.getName()); + Interpreter.register( + "scalding", + "scalding", + ScaldingInterpreter.class.getName(), + new InterpreterPropertyBuilder() + .add(ARGS_STRING, ARGS_STRING_DEFAULT, "Arguments for scalding REPL") + .add(MAX_OPEN_INSTANCES, MAX_OPEN_INSTANCES_DEFAULT, + "Maximum number of open interpreter instances") + .build()); } + static int numOpenInstances = 0; private ScaldingILoop interpreter; private ByteArrayOutputStream out; - private Map<String, Object> binder; public ScaldingInterpreter(Properties property) { super(property); @@ -72,104 +79,34 @@ public class ScaldingInterpreter extends Interpreter { @Override public void open() { - URL[] urls = getClassloaderUrls(); - - // Very nice discussion about how scala compiler handle classpath - // https://groups.google.com/forum/#!topic/scala-user/MlVwo2xCCI0 - - /* - * > val env = new nsc.Settings(errLogger) > env.usejavacp.value = true > val p = new - * Interpreter(env) > p.setContextClassLoader > Alternatively you can set the class path through - * nsc.Settings.classpath. - * - * >> val settings = new Settings() >> settings.usejavacp.value = true >> - * settings.classpath.value += File.pathSeparator + >> System.getProperty("java.class.path") >> - * val in = new Interpreter(settings) { >> override protected def parentClassLoader = - * getClass.getClassLoader >> } >> in.setContextClassLoader() - */ - Settings settings = new Settings(); - - // set classpath for scala compiler - PathSetting pathSettings = settings.classpath(); - String classpath = ""; - List<File> paths = currentClassPath(); - for (File f : paths) { - if (classpath.length() > 0) { - classpath += File.pathSeparator; - } - classpath += f.getAbsolutePath(); + numOpenInstances = numOpenInstances + 1; + String maxOpenInstancesStr = property.getProperty(MAX_OPEN_INSTANCES, + MAX_OPEN_INSTANCES_DEFAULT); + int maxOpenInstances = 50; + try { + maxOpenInstances = Integer.valueOf(maxOpenInstancesStr); + } catch (Exception e) { + logger.error("Error reading max.open.instances", e); } - - if (urls != null) { - for (URL u : urls) { - if (classpath.length() > 0) { - classpath += File.pathSeparator; - } - classpath += u.getFile(); - } + logger.info("max.open.instances = {}", maxOpenInstances); + if (numOpenInstances > maxOpenInstances) { + logger.error("Reached maximum number of open instances"); + return; } - - pathSettings.v_$eq(classpath); - settings.scala$tools$nsc$settings$ScalaSettings$_setter_$classpath_$eq(pathSettings); - - - // set classloader for scala compiler - settings.explicitParentLoader_$eq(new Some<ClassLoader>(Thread.currentThread() - .getContextClassLoader())); - BooleanSetting b = (BooleanSetting) settings.usejavacp(); - b.v_$eq(true); - settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b); - - /* Scalding interpreter */ - PrintStream printStream = new PrintStream(out); - interpreter = new ScaldingILoop(null, new PrintWriter(out)); - interpreter.settings_$eq(settings); - interpreter.createInterpreter(); - - interpreter.intp(). - interpret("@transient var _binder = new java.util.HashMap[String, Object]()"); - binder = (Map<String, Object>) getValue("_binder"); - binder.put("out", printStream); - } - - private Object getValue(String name) { - Object ret = interpreter.intp().valueOfTerm(name); - if (ret instanceof None) { - return null; - } else if (ret instanceof Some) { - return ((Some) ret).get(); + logger.info("Opening instance {}", numOpenInstances); + logger.info("property: {}", property); + String argsString = property.getProperty(ARGS_STRING, ARGS_STRING_DEFAULT); + String[] args; + if (argsString == null) { + args = new String[0]; } else { - return ret; - } - } - - private List<File> currentClassPath() { - List<File> paths = classPath(Thread.currentThread().getContextClassLoader()); - String[] cps = System.getProperty("java.class.path").split(File.pathSeparator); - if (cps != null) { - for (String cp : cps) { - paths.add(new File(cp)); - } - } - return paths; - } - - private List<File> classPath(ClassLoader cl) { - List<File> paths = new LinkedList<File>(); - if (cl == null) { - return paths; + args = argsString.split(" "); } + logger.info("{}", Arrays.toString(args)); - if (cl instanceof URLClassLoader) { - URLClassLoader ucl = (URLClassLoader) cl; - URL[] urls = ucl.getURLs(); - if (urls != null) { - for (URL url : urls) { - paths.add(new File(url.getFile())); - } - } - } - return paths; + PrintWriter printWriter = new PrintWriter(out, true); + interpreter = ZeppelinScaldingShell.getRepl(args, printWriter); + interpreter.createInterpreter(); } @Override @@ -180,12 +117,49 @@ public class ScaldingInterpreter extends Interpreter { @Override public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) { - logger.info("Running Scalding command '" + cmd + "'"); - + String user = contextInterpreter.getAuthenticationInfo().getUser(); + logger.info("Running Scalding command: user: {} cmd: '{}'", user, cmd); + + if (interpreter == null) { + logger.error( + "interpreter == null, open may not have been called because max.open.instances reached"); + return new InterpreterResult(Code.ERROR, + "interpreter == null\n" + + "open may not have been called because max.open.instances reached" + ); + } if (cmd == null || cmd.trim().length() == 0) { return new InterpreterResult(Code.SUCCESS); } - return interpret(cmd.split("\n"), contextInterpreter); + InterpreterResult interpreterResult = new InterpreterResult(Code.ERROR); + if (property.getProperty(ARGS_STRING).contains("hdfs")) { + UserGroupInformation ugi = null; + try { + ugi = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser()); + } catch (IOException e) { + logger.error("Error creating UserGroupInformation", e); + return new InterpreterResult(Code.ERROR, e.getMessage()); + } + try { + // Make variables final to avoid "local variable is accessed from within inner class; + // needs to be declared final" exception in JDK7 + final String cmd1 = cmd; + final InterpreterContext contextInterpreter1 = contextInterpreter; + PrivilegedExceptionAction<InterpreterResult> action = + new PrivilegedExceptionAction<InterpreterResult>() { + public InterpreterResult run() throws Exception { + return interpret(cmd1.split("\n"), contextInterpreter1); + } + }; + interpreterResult = ugi.doAs(action); + } catch (Exception e) { + logger.error("Error running command with ugi.doAs", e); + return new InterpreterResult(Code.ERROR, e.getMessage()); + } + } else { + interpreterResult = interpret(cmd.split("\n"), contextInterpreter); + } + return interpreterResult; } public InterpreterResult interpret(String[] lines, InterpreterContext context) { @@ -205,8 +179,13 @@ public class ScaldingInterpreter extends Interpreter { } linesToRun[lines.length] = "print(\"\")"; - Console.setOut((java.io.PrintStream) binder.get("out")); out.reset(); + + // Moving two lines below from open() to this function. + // If they are in open output is incomplete. + PrintStream printStream = new PrintStream(out, true); + Console.setOut(printStream); + Code r = null; String incomplete = ""; boolean inComment = false; @@ -261,7 +240,6 @@ public class ScaldingInterpreter extends Interpreter { incomplete = ""; } } - if (r == Code.INCOMPLETE) { return new InterpreterResult(r, "Incomplete expression"); } else { @@ -306,4 +284,5 @@ public class ScaldingInterpreter extends Interpreter { public List<String> completion(String buf, int cursor) { return NO_COMPLETION; } + } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/96dbc665/scalding/src/main/scala/org/apache/zeppelin/scalding/ScaldingILoop.scala ---------------------------------------------------------------------- diff --git a/scalding/src/main/scala/org/apache/zeppelin/scalding/ScaldingILoop.scala b/scalding/src/main/scala/org/apache/zeppelin/scalding/ScaldingILoop.scala deleted file mode 100644 index bd23c49..0000000 --- a/scalding/src/main/scala/org/apache/zeppelin/scalding/ScaldingILoop.scala +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.scalding; - -import java.io.{BufferedReader, File, FileReader} - -import scala.tools.nsc.GenericRunnerSettings -import scala.tools.nsc.interpreter.{ILoop, IR, JPrintWriter} - - -/** - * A class providing Scalding specific commands for inclusion in the Scalding REPL. - * This is currently forked from Scalding, but should eventually make it into Scalding itself: - * https://github.com/twitter/scalding/blob/develop/scalding-repl/src/main/scala/com/twitter/scalding/ScaldingILoop.scala - */ - class ScaldingILoop(in0: Option[BufferedReader], out: JPrintWriter) - extends ILoop(in0, out) { - // def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out) - // def this() = this(None, new JPrintWriter(Console.out, true)) - - settings = new GenericRunnerSettings({ s => echo(s) }) - - override def printWelcome() { - val fc = Console.YELLOW - val wc = Console.RED - def wrapFlames(s: String) = s.replaceAll("[()]+", fc + "$0" + wc) - echo(fc + - " ( \n" + - " )\\ ) ( ( \n" + - "(()/( ) )\\ )\\ ) ( ( ( \n" + - " /(_)) ( ( /( ((_)(()/( )\\ ( )\\))( \n" + - "(_)) )\\ )( )) _ ((_)(( ) )\\ ) (( ))\\ \n".replaceAll("_", wc + "_" + fc) + wc + - wrapFlames("/ __|((_) ((_)_ | | _| | (_) _(_(( (_()_) \n") + - wrapFlames("\\__ \\/ _| / _` || |/ _` | | || ' \\))/ _` \\ \n") + - "|___/\\__| \\__,_||_|\\__,_| |_||_||_| \\__, | \n" + - " |___/ ") - } - - /** - * Commands specific to the Scalding REPL. To define a new command use one of the following - * factory methods: - * - `LoopCommand.nullary` for commands that take no arguments - * - `LoopCommand.cmd` for commands that take one string argument - * - `LoopCommand.varargs` for commands that take multiple string arguments - */ - private val scaldingCommands: List[LoopCommand] = List() - - /** - * Change the shell prompt to read scalding> - * - * @return a prompt string to use for this REPL. - */ - override def prompt: String = Console.BLUE + "\nscalding> " + Console.RESET - - private[this] def addImports(ids: String*): IR.Result = - if (ids.isEmpty) IR.Success - else intp.interpret("import " + ids.mkString(", ")) - - /** - * Search for files with the given name in all directories from current directory - * up to root. - */ - private def findAllUpPath(filename: String): List[File] = - Iterator.iterate(System.getProperty("user.dir"))(new File(_).getParent) - .takeWhile(_ != "/") - .flatMap(new File(_).listFiles.filter(_.toString.endsWith(filename))) - .toList - - /** - * Gets the list of commands that this REPL supports. - * - * @return a list of the command supported by this REPL. - */ - override def commands: List[LoopCommand] = super.commands ++ scaldingCommands - - protected def imports: List[String] = List( - "com.twitter.scalding._", - "com.twitter.scalding.ReplImplicits._", - "com.twitter.scalding.ReplImplicitContext._", - "com.twitter.scalding.ReplState._") - - override def createInterpreter() { - super.createInterpreter() - intp.beQuietDuring { - addImports(imports: _*) - - settings match { - case s: GenericRunnerSettings => - findAllUpPath(".scalding_repl").reverse.foreach { - f => s.loadfiles.appendToValue(f.toString) - } - case _ => () - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/96dbc665/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinReplState.scala ---------------------------------------------------------------------- diff --git a/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinReplState.scala b/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinReplState.scala new file mode 100644 index 0000000..b847eba --- /dev/null +++ b/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinReplState.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.scalding + +/** + * Stores REPL state + */ + +import cascading.flow.FlowDef +import com.twitter.scalding.BaseReplState +import scala.concurrent.{ ExecutionContext => ConcurrentExecutionContext } +import scala.concurrent.Future +import scala.util.{Failure, Success} + +object ZeppelinReplState extends BaseReplState { + override def shell = ZeppelinScaldingShell +} + +/** + * Implicit FlowDef and Mode, import in the REPL to have the global context implicitly + * used everywhere. + */ +object ZeppelinReplImplicitContext { + /** Implicit execution context for using the Execution monad */ + implicit val executionContext = ConcurrentExecutionContext.global + /** Implicit repl state used for ShellPipes */ + implicit def stateImpl = ZeppelinReplState + /** Implicit flowDef for this Scalding shell session. */ + implicit def flowDefImpl = ZeppelinReplState.flowDef + /** Defaults to running in local mode if no mode is specified. */ + implicit def modeImpl = ZeppelinReplState.mode + implicit def configImpl = ZeppelinReplState.config +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/96dbc665/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingLoop.scala ---------------------------------------------------------------------- diff --git a/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingLoop.scala b/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingLoop.scala new file mode 100644 index 0000000..9be0199 --- /dev/null +++ b/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingLoop.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.scalding + +import java.io.BufferedReader +import com.twitter.scalding.ScaldingILoop + +import scala.tools.nsc.interpreter._ + +/** + * TBD + */ +class ZeppelinScaldingILoop(in: Option[BufferedReader], out: JPrintWriter) + extends ScaldingILoop(in, out) { + + override protected def imports = List( + "com.twitter.scalding.{ ScaldingILoop => ScaldingScaldingILoop, ScaldingShell => ScaldingScaldingShell, _ }", + // ReplImplicits minus fields API parts (esp FieldConversions) + """com.twitter.scalding.ReplImplicits.{ + iterableToSource, + keyedListLikeToShellTypedPipe, + typedPipeToShellTypedPipe, + valuePipeToShellValuePipe + }""", + "com.twitter.scalding.ReplImplicits", + "org.apache.zeppelin.scalding.ZeppelinReplImplicitContext._", + "org.apache.zeppelin.scalding.ZeppelinReplState", + "org.apache.zeppelin.scalding.ZeppelinReplState._" + ) + +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/96dbc665/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingShell.scala ---------------------------------------------------------------------- diff --git a/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingShell.scala b/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingShell.scala new file mode 100644 index 0000000..29e5f83 --- /dev/null +++ b/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingShell.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.scalding + +import com.twitter.scalding._ +import com.twitter.scalding.typed.TypedPipe +import scala.tools.nsc.{GenericRunnerCommand} +import scala.tools.nsc.interpreter._ + +/** + * TBD + */ +object ZeppelinScaldingShell extends BaseScaldingShell { + + override def replState = ZeppelinReplState + + def getRepl(args: Array[String], out: JPrintWriter): ScaldingILoop = { + + val argsExpanded = ExpandLibJarsGlobs(args) + val ShellArgs(cfg, mode, cmdArgs) = parseModeArgs(argsExpanded) + + // Process command line arguments into a settings object, and use that to start the REPL. + // We ignore params we don't care about - hence error function is empty + val command = new GenericRunnerCommand(cmdArgs, _ => ()) + + // inherit defaults for embedded interpretter (needed for running with SBT) + // (TypedPipe chosen arbitrarily, just needs to be something representative) + command.settings.embeddedDefaults[TypedPipe[String]] + + // if running from the assembly, need to explicitly tell it to use java classpath + if (args.contains("--repl")) command.settings.usejavacp.value = true + + command.settings.classpath.append(System.getProperty("java.class.path")) + + // Force the repl to be synchronous, so all cmds are executed in the same thread + command.settings.Yreplsync.value = true + + val repl = new ZeppelinScaldingILoop(None, out) + scaldingREPL = Some(repl) + replState.mode = mode + replState.customConfig = replState.customConfig ++ (mode match { + case _: HadoopMode => cfg + case _ => Config.empty + }) + + // if in Hdfs mode, store the mode to enable switching between Local and Hdfs + mode match { + case m @ Hdfs(_, _) => replState.storedHdfsMode = Some(m) + case _ => () + } + + repl.settings = command.settings + return repl; + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/96dbc665/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java ---------------------------------------------------------------------- diff --git a/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java b/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java index 08c67da..7ffbd97 100644 --- a/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java +++ b/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java @@ -57,6 +57,7 @@ public class ScaldingInterpreterTest { if (repl == null) { Properties p = new Properties(); + p.setProperty(ScaldingInterpreter.ARGS_STRING, "--local --repl"); repl = new ScaldingInterpreter(p); repl.open(); @@ -119,7 +120,7 @@ public class ScaldingInterpreterTest { "val salesPipe = TypedPipe.from(salesList)\n" + "val results = salesPipe.map{x => (1, Set(x.state), x.sale)}.\n" + " groupAll.sum.values.map{ case(count, set, sum) => (count, set.size, sum) }\n" + - "results.dump", + "results.dump", context).code()); }
