http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java index 25ad407..6f157a0 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java @@ -17,20 +17,6 @@ package org.apache.zeppelin.spark; -import java.io.File; -import java.io.IOException; -import java.io.PrintWriter; -import java.lang.reflect.Constructor; -import java.lang.reflect.Field; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.net.URL; -import java.net.URLClassLoader; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.spark.JobProgressUtil; @@ -78,24 +64,41 @@ import scala.tools.nsc.settings.MutableSettings; import scala.tools.nsc.settings.MutableSettings.BooleanSetting; import scala.tools.nsc.settings.MutableSettings.PathSetting; -/** Spark interpreter for Zeppelin. */ +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Spark interpreter for Zeppelin. + * + */ public class OldSparkInterpreter extends AbstractSparkInterpreter { public static Logger logger = LoggerFactory.getLogger(OldSparkInterpreter.class); private SparkZeppelinContext z; private SparkILoop interpreter; /** - * intp - org.apache.spark.repl.SparkIMain (scala 2.10) intp - scala.tools.nsc.interpreter.IMain; - * (scala 2.11) + * intp - org.apache.spark.repl.SparkIMain (scala 2.10) + * intp - scala.tools.nsc.interpreter.IMain; (scala 2.11) */ private Object intp; - private SparkConf conf; private static SparkContext sc; private static SQLContext sqlc; private static InterpreterHookRegistry hooks; private static SparkEnv env; - private static Object sparkSession; // spark 2.x + private static Object sparkSession; // spark 2.x private static SparkListener sparkListener; private static AbstractFile classOutputDir; private static Integer sharedInterpreterLock = new Integer(0); @@ -105,13 +108,15 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { private SparkDependencyResolver dep; private static String sparkUrl; - /** completer - org.apache.spark.repl.SparkJLineCompletion (scala 2.10) */ + /** + * completer - org.apache.spark.repl.SparkJLineCompletion (scala 2.10) + */ private Object completer = null; private Map<String, Object> binder; private SparkVersion sparkVersion; - private static File outputDir; // class outputdir for scala 2.11 - private Object classServer; // classserver for scala 2.11 + private static File outputDir; // class outputdir for scala 2.11 + private Object classServer; // classserver for scala 2.11 private JavaSparkContext jsc; private boolean enableSupportedVersionCheck; @@ -159,7 +164,6 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { /** * See org.apache.spark.sql.SparkSession.hiveClassesArePresent - * * @return */ private boolean hiveClassesArePresent() { @@ -195,7 +199,9 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { } } - /** Get SQLContext for spark 2.x */ + /** + * Get SQLContext for spark 2.x + */ private SQLContext getSQLContext_2() { if (sqlc == null) { sqlc = (SQLContext) Utils.invokeMethod(sparkSession, "sqlContext"); @@ -209,14 +215,12 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { String name = "org.apache.spark.sql.hive.HiveContext"; Constructor<?> hc; try { - hc = getClass().getClassLoader().loadClass(name).getConstructor(SparkContext.class); + hc = getClass().getClassLoader().loadClass(name) + .getConstructor(SparkContext.class); sqlc = (SQLContext) hc.newInstance(getSparkContext()); - } catch (NoSuchMethodException - | SecurityException - | ClassNotFoundException - | InstantiationException - | IllegalAccessException - | IllegalArgumentException + } catch (NoSuchMethodException | SecurityException + | ClassNotFoundException | InstantiationException + | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { logger.warn("Can't create HiveContext. Fallback to SQLContext", e); // when hive dependency is not loaded, it'll fail. @@ -230,16 +234,15 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { return sqlc; } + public SparkDependencyResolver getDependencyResolver() { if (dep == null) { - dep = - new SparkDependencyResolver( - (Global) Utils.invokeMethod(intp, "global"), - (ClassLoader) - Utils.invokeMethod(Utils.invokeMethod(intp, "classLoader"), "getParent"), - sc, - getProperty("zeppelin.dep.localrepo"), - getProperty("zeppelin.dep.additionalRemoteRepository")); + dep = new SparkDependencyResolver( + (Global) Utils.invokeMethod(intp, "global"), + (ClassLoader) Utils.invokeMethod(Utils.invokeMethod(intp, "classLoader"), "getParent"), + sc, + getProperty("zeppelin.dep.localrepo"), + getProperty("zeppelin.dep.additionalRemoteRepository")); } return dep; } @@ -252,7 +255,10 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { return master.startsWith("yarn"); } - /** Spark 2.x Create SparkSession */ + /** + * Spark 2.x + * Create SparkSession + */ public Object createSparkSession() { // use local mode for embedded spark mode when spark.master is not found conf.setIfMissing("spark.master", "local"); @@ -286,7 +292,7 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { Class SparkSession = Utils.findClass("org.apache.spark.sql.SparkSession"); Object builder = Utils.invokeStaticMethod(SparkSession, "builder"); - Utils.invokeMethod(builder, "config", new Class[] {SparkConf.class}, new Object[] {conf}); + Utils.invokeMethod(builder, "config", new Class[]{ SparkConf.class }, new Object[]{ conf }); if (useHiveContext()) { if (hiveClassesArePresent()) { @@ -294,11 +300,9 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { sparkSession = Utils.invokeMethod(builder, "getOrCreate"); logger.info("Created Spark session with Hive support"); } else { - Utils.invokeMethod( - builder, - "config", - new Class[] {String.class, String.class}, - new Object[] {"spark.sql.catalogImplementation", "in-memory"}); + Utils.invokeMethod(builder, "config", + new Class[]{ String.class, String.class}, + new Object[]{ "spark.sql.catalogImplementation", "in-memory"}); sparkSession = Utils.invokeMethod(builder, "getOrCreate"); logger.info("Created Spark session with Hive support use in-memory catalogImplementation"); } @@ -320,7 +324,6 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { /** * Create SparkContext for spark 2.x - * * @return */ private SparkContext createSparkContext_2() { @@ -340,10 +343,8 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { if (Utils.isScala2_10()) { jars = (String[]) Utils.invokeStaticMethod(SparkILoop.class, "getAddedJars"); } else { - jars = - (String[]) - Utils.invokeStaticMethod( - Utils.findClass("org.apache.spark.repl.Main"), "getAddedJars"); + jars = (String[]) Utils.invokeStaticMethod( + Utils.findClass("org.apache.spark.repl.Main"), "getAddedJars"); } String classServerUri = null; @@ -353,11 +354,8 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { Method classServer = intp.getClass().getMethod("classServer"); Object httpServer = classServer.invoke(intp); classServerUri = (String) Utils.invokeMethod(httpServer, "uri"); - } catch (NoSuchMethodException - | SecurityException - | IllegalAccessException - | IllegalArgumentException - | InvocationTargetException e) { + } catch (NoSuchMethodException | SecurityException | IllegalAccessException + | IllegalArgumentException | InvocationTargetException e) { // continue } @@ -365,16 +363,12 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { try { // for spark 1.3x Method classServer = intp.getClass().getMethod("classServerUri"); classServerUri = (String) classServer.invoke(intp); - } catch (NoSuchMethodException - | SecurityException - | IllegalAccessException - | IllegalArgumentException - | InvocationTargetException e) { + } catch (NoSuchMethodException | SecurityException | IllegalAccessException + | IllegalArgumentException | InvocationTargetException e) { // continue instead of: throw new InterpreterException(e); // Newer Spark versions (like the patched CDH5.7.0 one) don't contain this method - logger.warn( - String.format( - "Spark method classServerUri not available due to: [%s]", e.getMessage())); + logger.warn(String.format("Spark method classServerUri not available due to: [%s]", + e.getMessage())); } } @@ -383,11 +377,8 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { Method getClassOutputDirectory = intp.getClass().getMethod("getClassOutputDirectory"); File classOutputDirectory = (File) getClassOutputDirectory.invoke(intp); replClassOutputDirectory = classOutputDirectory.getAbsolutePath(); - } catch (NoSuchMethodException - | SecurityException - | IllegalAccessException - | IllegalArgumentException - | InvocationTargetException e) { + } catch (NoSuchMethodException | SecurityException | IllegalAccessException + | IllegalArgumentException | InvocationTargetException e) { // continue } } @@ -450,16 +441,15 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { @Override public void open() throws InterpreterException { - this.enableSupportedVersionCheck = - java.lang.Boolean.parseBoolean( - getProperty("zeppelin.spark.enableSupportedVersionCheck", "true")); + this.enableSupportedVersionCheck = java.lang.Boolean.parseBoolean( + getProperty("zeppelin.spark.enableSupportedVersionCheck", "true")); // set properties and do login before creating any spark stuff for secured cluster if (isYarnMode()) { System.setProperty("SPARK_YARN_MODE", "true"); } - if (getProperties().containsKey("spark.yarn.keytab") - && getProperties().containsKey("spark.yarn.principal")) { + if (getProperties().containsKey("spark.yarn.keytab") && + getProperties().containsKey("spark.yarn.principal")) { try { String keytab = getProperties().getProperty("spark.yarn.keytab"); String principal = getProperties().getProperty("spark.yarn.principal"); @@ -499,9 +489,8 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { argList.add(arg); } - DepInterpreter depInterpreter = - getParentSparkInterpreter() - .getInterpreterInTheSameSessionByClassName(DepInterpreter.class, false); + DepInterpreter depInterpreter = getParentSparkInterpreter(). + getInterpreterInTheSameSessionByClassName(DepInterpreter.class, false); String depInterpreterClasspath = ""; if (depInterpreter != null) { SparkDependencyContext depc = depInterpreter.getDependencyContext(); @@ -518,15 +507,15 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { } } + if (Utils.isScala2_10()) { scala.collection.immutable.List<String> list = JavaConversions.asScalaBuffer(argList).toList(); - Object sparkCommandLine = - Utils.instantiateClass( - "org.apache.spark.repl.SparkCommandLine", - new Class[] {scala.collection.immutable.List.class}, - new Object[] {list}); + Object sparkCommandLine = Utils.instantiateClass( + "org.apache.spark.repl.SparkCommandLine", + new Class[]{ scala.collection.immutable.List.class }, + new Object[]{ list }); settings = (Settings) Utils.invokeMethod(sparkCommandLine, "settings"); } else { @@ -618,7 +607,8 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { settings.scala$tools$nsc$settings$ScalaSettings$_setter_$classpath_$eq(pathSettings); // set classloader for scala compiler - settings.explicitParentLoader_$eq(new Some<>(Thread.currentThread().getContextClassLoader())); + settings.explicitParentLoader_$eq(new Some<>(Thread.currentThread() + .getContextClassLoader())); BooleanSetting b = (BooleanSetting) settings.usejavacp(); b.v_$eq(true); settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b); @@ -652,8 +642,8 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { if (printREPLOutput()) { this.interpreter = new SparkILoop((java.io.BufferedReader) null, new PrintWriter(out)); } else { - this.interpreter = - new SparkILoop((java.io.BufferedReader) null, new PrintWriter(Console.out(), false)); + this.interpreter = new SparkILoop((java.io.BufferedReader) null, + new PrintWriter(Console.out(), false)); } interpreter.settings_$eq(settings); @@ -682,24 +672,22 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { } if (Utils.findClass("org.apache.spark.repl.SparkJLineCompletion", true) != null) { - completer = - Utils.instantiateClass( - "org.apache.spark.repl.SparkJLineCompletion", - new Class[] {Utils.findClass("org.apache.spark.repl.SparkIMain")}, - new Object[] {intp}); - } else if (Utils.findClass("scala.tools.nsc.interpreter.PresentationCompilerCompleter", true) - != null) { - completer = - Utils.instantiateClass( - "scala.tools.nsc.interpreter.PresentationCompilerCompleter", - new Class[] {IMain.class}, - new Object[] {intp}); - } else if (Utils.findClass("scala.tools.nsc.interpreter.JLineCompletion", true) != null) { - completer = - Utils.instantiateClass( - "scala.tools.nsc.interpreter.JLineCompletion", - new Class[] {IMain.class}, - new Object[] {intp}); + completer = Utils.instantiateClass( + "org.apache.spark.repl.SparkJLineCompletion", + new Class[]{Utils.findClass("org.apache.spark.repl.SparkIMain")}, + new Object[]{intp}); + } else if (Utils.findClass( + "scala.tools.nsc.interpreter.PresentationCompilerCompleter", true) != null) { + completer = Utils.instantiateClass( + "scala.tools.nsc.interpreter.PresentationCompilerCompleter", + new Class[]{ IMain.class }, + new Object[]{ intp }); + } else if (Utils.findClass( + "scala.tools.nsc.interpreter.JLineCompletion", true) != null) { + completer = Utils.instantiateClass( + "scala.tools.nsc.interpreter.JLineCompletion", + new Class[]{ IMain.class }, + new Object[]{ intp }); } if (Utils.isSpark2()) { @@ -723,9 +711,8 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { sparkShims.setupSparkListener(sc.master(), sparkUrl, InterpreterContext.get()); numReferenceOfSparkContext.incrementAndGet(); - z = - new SparkZeppelinContext( - sc, sparkShims, hooks, Integer.parseInt(getProperty("zeppelin.spark.maxResult"))); + z = new SparkZeppelinContext(sc, sparkShims, hooks, + Integer.parseInt(getProperty("zeppelin.spark.maxResult"))); interpret("@transient val _binder = new java.util.HashMap[String, Object]()"); Map<String, Object> binder; @@ -742,23 +729,18 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { binder.put("spark", sparkSession); } - interpret( - "@transient val z = " - + "_binder.get(\"z\").asInstanceOf[org.apache.zeppelin.spark.SparkZeppelinContext]"); - interpret( - "@transient val sc = " - + "_binder.get(\"sc\").asInstanceOf[org.apache.spark.SparkContext]"); - interpret( - "@transient val sqlc = " - + "_binder.get(\"sqlc\").asInstanceOf[org.apache.spark.sql.SQLContext]"); - interpret( - "@transient val sqlContext = " - + "_binder.get(\"sqlc\").asInstanceOf[org.apache.spark.sql.SQLContext]"); + interpret("@transient val z = " + + "_binder.get(\"z\").asInstanceOf[org.apache.zeppelin.spark.SparkZeppelinContext]"); + interpret("@transient val sc = " + + "_binder.get(\"sc\").asInstanceOf[org.apache.spark.SparkContext]"); + interpret("@transient val sqlc = " + + "_binder.get(\"sqlc\").asInstanceOf[org.apache.spark.sql.SQLContext]"); + interpret("@transient val sqlContext = " + + "_binder.get(\"sqlc\").asInstanceOf[org.apache.spark.sql.SQLContext]"); if (Utils.isSpark2()) { - interpret( - "@transient val spark = " - + "_binder.get(\"spark\").asInstanceOf[org.apache.spark.sql.SparkSession]"); + interpret("@transient val spark = " + + "_binder.get(\"spark\").asInstanceOf[org.apache.spark.sql.SparkSession]"); } interpret("import org.apache.spark.SparkContext._"); @@ -789,16 +771,11 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { if (Utils.isScala2_10()) { try { - Method loadFiles = - this.interpreter - .getClass() - .getMethod("org$apache$spark$repl$SparkILoop$$loadFiles", Settings.class); + Method loadFiles = this.interpreter.getClass().getMethod( + "org$apache$spark$repl$SparkILoop$$loadFiles", Settings.class); loadFiles.invoke(this.interpreter, settings); - } catch (NoSuchMethodException - | SecurityException - | IllegalAccessException - | IllegalArgumentException - | InvocationTargetException e) { + } catch (NoSuchMethodException | SecurityException | IllegalAccessException + | IllegalArgumentException | InvocationTargetException e) { throw new InterpreterException(e); } } @@ -840,6 +817,7 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { } } } + } public String getSparkUIUrl() { @@ -868,8 +846,11 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { private Results.Result interpret(String line) { out.ignoreLeadingNewLinesFromScalaReporter(); - return (Results.Result) - Utils.invokeMethod(intp, "interpret", new Class[] {String.class}, new Object[] {line}); + return (Results.Result) Utils.invokeMethod( + intp, + "interpret", + new Class[] {String.class}, + new Object[] {line}); } private List<File> currentClassPath() { @@ -902,8 +883,8 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { } @Override - public List<InterpreterCompletion> completion( - String buf, int cursor, InterpreterContext interpreterContext) { + public List<InterpreterCompletion> completion(String buf, int cursor, + InterpreterContext interpreterContext) { if (completer == null) { logger.warn("Can't find completer"); return new LinkedList<>(); @@ -951,7 +932,8 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { String completionScriptText = ""; try { completionScriptText = text.substring(0, cursor); - } catch (Exception e) { + } + catch (Exception e) { logger.error(e.toString()); return null; } @@ -965,15 +947,18 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { if (indexOfReverseSeqPostion < completionStartPosition && indexOfReverseSeqPostion > 0) { completionStartPosition = indexOfReverseSeqPostion; } + } if (completionStartPosition == completionEndPosition) { completionStartPosition = 0; - } else { + } + else + { completionStartPosition = completionEndPosition - completionStartPosition; } - resultCompletionText = - completionScriptText.substring(completionStartPosition, completionEndPosition); + resultCompletionText = completionScriptText.substring( + completionStartPosition , completionEndPosition); return resultCompletionText; } @@ -983,8 +968,8 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { * Somehow intp.valueOfTerm returns scala.None always with -Yrepl-class-based option */ public Object getValue(String name) { - Object ret = - Utils.invokeMethod(intp, "valueOfTerm", new Class[] {String.class}, new Object[] {name}); + Object ret = Utils.invokeMethod( + intp, "valueOfTerm", new Class[]{String.class}, new Object[]{name}); if (ret instanceof None || ret instanceof scala.None$) { return null; @@ -1000,20 +985,23 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { if (r == null || r.lineRep() == null) { return null; } - Object obj = r.lineRep().call("$result", JavaConversions.asScalaBuffer(new LinkedList<>())); + Object obj = r.lineRep().call("$result", + JavaConversions.asScalaBuffer(new LinkedList<>())); return obj; } public boolean isUnsupportedSparkVersion() { - return enableSupportedVersionCheck && sparkVersion.isUnsupportedVersion(); + return enableSupportedVersionCheck && sparkVersion.isUnsupportedVersion(); } - /** Interpret a single line. */ + /** + * Interpret a single line. + */ @Override public InterpreterResult interpret(String line, InterpreterContext context) { if (isUnsupportedSparkVersion()) { - return new InterpreterResult( - Code.ERROR, "Spark " + sparkVersion.toString() + " is not supported"); + return new InterpreterResult(Code.ERROR, "Spark " + sparkVersion.toString() + + " is not supported"); } z.setInterpreterContext(context); if (line == null || line.trim().length() == 0) { @@ -1055,9 +1043,9 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { String nextLine = linesToRun[l + 1].trim(); boolean continuation = false; if (nextLine.isEmpty() - || nextLine.startsWith("//") // skip empty line or comment + || nextLine.startsWith("//") // skip empty line or comment || nextLine.startsWith("}") - || nextLine.startsWith("object")) { // include "} object" for Scala companion object + || nextLine.startsWith("object")) { // include "} object" for Scala companion object continuation = true; } else if (!inComment && nextLine.startsWith("/*")) { inComment = true; @@ -1067,8 +1055,8 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { continuation = true; } else if (nextLine.length() > 1 && nextLine.charAt(0) == '.' - && nextLine.charAt(1) != '.' // ".." - && nextLine.charAt(1) != '/') { // "./" + && nextLine.charAt(1) != '.' // ".." + && nextLine.charAt(1) != '/') { // "./" continuation = true; } else if (inComment) { continuation = true; @@ -1140,14 +1128,12 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { if (lastObj != null) { ResourcePool resourcePool = context.getResourcePool(); - resourcePool.put( - context.getNoteId(), - context.getParagraphId(), - WellKnownResourceName.ZeppelinReplResult.toString(), - lastObj); + resourcePool.put(context.getNoteId(), context.getParagraphId(), + WellKnownResourceName.ZeppelinReplResult.toString(), lastObj); } }; + @Override public void cancel(InterpreterContext context) { sc.cancelJobGroup(Utils.buildJobGroupId(context)); @@ -1176,7 +1162,7 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { if (numReferenceOfSparkContext.decrementAndGet() == 0) { if (sparkSession != null) { Utils.invokeMethod(sparkSession, "stop"); - } else if (sc != null) { + } else if (sc != null){ sc.stop(); } sparkSession = null; @@ -1198,8 +1184,8 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { @Override public Scheduler getScheduler() { - return SchedulerFactory.singleton() - .createOrGetFIFOScheduler(OldSparkInterpreter.class.getName() + this.hashCode()); + return SchedulerFactory.singleton().createOrGetFIFOScheduler( + OldSparkInterpreter.class.getName() + this.hashCode()); } public SparkZeppelinContext getZeppelinContext() { @@ -1214,23 +1200,19 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { File file = null; // try Utils.createTempDir() - file = - (File) - Utils.invokeStaticMethod( - Utils.findClass("org.apache.spark.util.Utils"), - "createTempDir", - new Class[] {String.class, String.class}, - new Object[] {dir, "spark"}); + file = (File) Utils.invokeStaticMethod( + Utils.findClass("org.apache.spark.util.Utils"), + "createTempDir", + new Class[]{String.class, String.class}, + new Object[]{dir, "spark"}); // fallback to old method if (file == null) { - file = - (File) - Utils.invokeStaticMethod( - Utils.findClass("org.apache.spark.util.Utils"), - "createTempDir", - new Class[] {String.class}, - new Object[] {dir}); + file = (File) Utils.invokeStaticMethod( + Utils.findClass("org.apache.spark.util.Utils"), + "createTempDir", + new Class[]{String.class}, + new Object[]{dir}); } return file; @@ -1240,40 +1222,28 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { SparkConf conf = new SparkConf(); try { // try to create HttpServer - Constructor<?> constructor = - getClass() - .getClassLoader() - .loadClass("org.apache.spark.HttpServer") - .getConstructor( - new Class[] { - SparkConf.class, File.class, SecurityManager.class, int.class, String.class - }); + Constructor<?> constructor = getClass().getClassLoader() + .loadClass("org.apache.spark.HttpServer") + .getConstructor(new Class[]{ + SparkConf.class, File.class, SecurityManager.class, int.class, String.class}); Object securityManager = createSecurityManager(conf); - return constructor.newInstance( - new Object[] {conf, outputDir, securityManager, 0, "HTTP Server"}); - - } catch (ClassNotFoundException - | NoSuchMethodException - | IllegalAccessException - | InstantiationException - | InvocationTargetException e) { + return constructor.newInstance(new Object[]{ + conf, outputDir, securityManager, 0, "HTTP Server"}); + + } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | + InstantiationException | InvocationTargetException e) { // fallback to old constructor Constructor<?> constructor = null; try { - constructor = - getClass() - .getClassLoader() - .loadClass("org.apache.spark.HttpServer") - .getConstructor( - new Class[] {File.class, SecurityManager.class, int.class, String.class}); - return constructor.newInstance( - new Object[] {outputDir, createSecurityManager(conf), 0, "HTTP Server"}); - } catch (ClassNotFoundException - | NoSuchMethodException - | IllegalAccessException - | InstantiationException - | InvocationTargetException e1) { + constructor = getClass().getClassLoader() + .loadClass("org.apache.spark.HttpServer") + .getConstructor(new Class[]{ + File.class, SecurityManager.class, int.class, String.class}); + return constructor.newInstance(new Object[] { + outputDir, createSecurityManager(conf), 0, "HTTP Server"}); + } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | + InstantiationException | InvocationTargetException e1) { logger.error(e1.getMessage(), e1); return null; } @@ -1292,23 +1262,19 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { * @throws InvocationTargetException * @throws InstantiationException */ - private Object createSecurityManager(SparkConf conf) - throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException, - InvocationTargetException, InstantiationException { + private Object createSecurityManager(SparkConf conf) throws ClassNotFoundException, + NoSuchMethodException, IllegalAccessException, InvocationTargetException, + InstantiationException { Object securityManager = null; try { - Constructor<?> smConstructor = - getClass() - .getClassLoader() - .loadClass("org.apache.spark.SecurityManager") - .getConstructor(new Class[] {SparkConf.class, scala.Option.class}); + Constructor<?> smConstructor = getClass().getClassLoader() + .loadClass("org.apache.spark.SecurityManager") + .getConstructor(new Class[]{ SparkConf.class, scala.Option.class }); securityManager = smConstructor.newInstance(conf, null); } catch (NoSuchMethodException e) { - Constructor<?> smConstructor = - getClass() - .getClassLoader() - .loadClass("org.apache.spark.SecurityManager") - .getConstructor(new Class[] {SparkConf.class}); + Constructor<?> smConstructor = getClass().getClassLoader() + .loadClass("org.apache.spark.SecurityManager") + .getConstructor(new Class[]{ SparkConf.class }); securityManager = smConstructor.newInstance(conf); } return securityManager;
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index 2481ade..32e805b 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -17,14 +17,6 @@ package org.apache.zeppelin.spark; -import java.io.File; -import java.io.IOException; -import java.net.MalformedURLException; -import java.net.URL; -import java.net.URLClassLoader; -import java.util.LinkedList; -import java.util.List; -import java.util.Properties; import org.apache.commons.lang.StringUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; @@ -39,10 +31,19 @@ import org.apache.zeppelin.spark.dep.SparkDependencyContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; + /** - * Interpreter for PySpark, it is the first implementation of interpreter for PySpark, so with less - * features compared to IPySparkInterpreter, but requires less prerequisites than - * IPySparkInterpreter, only python is required. + * Interpreter for PySpark, it is the first implementation of interpreter for PySpark, so with less + * features compared to IPySparkInterpreter, but requires less prerequisites than + * IPySparkInterpreter, only python is required. */ public class PySparkInterpreter extends PythonInterpreter { @@ -63,7 +64,7 @@ public class PySparkInterpreter extends PythonInterpreter { DepInterpreter depInterpreter = getInterpreterInTheSameSessionByClassName(DepInterpreter.class, false); // load libraries from Dependency Interpreter - URL[] urls = new URL[0]; + URL [] urls = new URL[0]; List<URL> urlList = new LinkedList<>(); if (depInterpreter != null) { @@ -107,8 +108,7 @@ public class PySparkInterpreter extends PythonInterpreter { // must create spark interpreter after ClassLoader is set, otherwise the additional jars // can not be loaded by spark repl. this.sparkInterpreter = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class); - setProperty( - "zeppelin.py4j.useAuth", + setProperty("zeppelin.py4j.useAuth", sparkInterpreter.getSparkVersion().isSecretSocketSupported() + ""); // create Python Process and JVM gateway super.open(); @@ -154,11 +154,9 @@ public class PySparkInterpreter extends PythonInterpreter { protected void preCallPython(InterpreterContext context) { String jobGroup = Utils.buildJobGroupId(context); String jobDesc = Utils.buildJobDesc(context); - callPython( - new PythonInterpretRequest( - String.format("if 'sc' in locals():\n\tsc.setJobGroup('%s', '%s')", jobGroup, jobDesc), - false, - false)); + callPython(new PythonInterpretRequest( + String.format("if 'sc' in locals():\n\tsc.setJobGroup('%s', '%s')", jobGroup, jobDesc), + false, false)); String pool = "None"; if (context.getLocalProperties().containsKey("pool")) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PythonUtils.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PythonUtils.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PythonUtils.java index df8e9a6..8182690 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PythonUtils.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PythonUtils.java @@ -15,23 +15,27 @@ * limitations under the License. */ + package org.apache.zeppelin.spark; +import org.apache.commons.lang3.StringUtils; + import java.io.File; import java.io.FilenameFilter; import java.util.ArrayList; import java.util.List; -import org.apache.commons.lang3.StringUtils; -/** Util class for PySpark */ +/** + * Util class for PySpark + */ public class PythonUtils { /** * Get the PYTHONPATH for PySpark, either from SPARK_HOME, if it is set, or from ZEPPELIN_HOME * when it is embedded mode. * - * <p>This method will called in zeppelin server process and spark driver process when it is local - * or yarn-client mode. + * This method will called in zeppelin server process and spark driver process when it is + * local or yarn-client mode. */ public static String sparkPythonPath() { List<String> pythonPath = new ArrayList<String>(); @@ -47,15 +51,12 @@ public class PythonUtils { throw new RuntimeException("No pyspark.zip found under " + sparkHome + "/python/lib"); } pythonPath.add(pyspark.getAbsolutePath()); - File[] py4j = - new File(sparkHome + "/python/lib") - .listFiles( - new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { - return name.startsWith("py4j"); - } - }); + File[] py4j = new File(sparkHome + "/python/lib").listFiles(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + return name.startsWith("py4j"); + } + }); if (py4j.length == 0) { throw new RuntimeException("No py4j files found under " + sparkHome + "/python/lib"); } else if (py4j.length > 1) { @@ -70,21 +71,19 @@ public class PythonUtils { throw new RuntimeException("No pyspark.zip found: " + pyspark.getAbsolutePath()); } pythonPath.add(pyspark.getAbsolutePath()); - File[] py4j = - new File(zeppelinHome, "interpreter/spark/pyspark") - .listFiles( - new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { - return name.startsWith("py4j"); - } - }); + File[] py4j = new File(zeppelinHome, "interpreter/spark/pyspark").listFiles( + new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + return name.startsWith("py4j"); + } + }); if (py4j.length == 0) { - throw new RuntimeException( - "No py4j files found under " + zeppelinHome + "/interpreter/spark/pyspark"); + throw new RuntimeException("No py4j files found under " + zeppelinHome + + "/interpreter/spark/pyspark"); } else if (py4j.length > 1) { - throw new RuntimeException( - "Multiple py4j files found under " + sparkHome + "/interpreter/spark/pyspark"); + throw new RuntimeException("Multiple py4j files found under " + sparkHome + + "/interpreter/spark/pyspark"); } else { pythonPath.add(py4j[0].getAbsolutePath()); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index 22b482d..dabe9ed 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -17,11 +17,10 @@ package org.apache.zeppelin.spark; -import java.util.List; -import java.util.Properties; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SQLContext; +import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterResult; @@ -29,9 +28,12 @@ import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; +import java.util.Properties; + /** - * It is the Wrapper of OldSparkInterpreter & NewSparkInterpreter. Property zeppelin.spark.useNew - * control which one to use. + * It is the Wrapper of OldSparkInterpreter & NewSparkInterpreter. + * Property zeppelin.spark.useNew control which one to use. */ public class SparkInterpreter extends AbstractSparkInterpreter { @@ -40,6 +42,7 @@ public class SparkInterpreter extends AbstractSparkInterpreter { // either OldSparkInterpreter or NewSparkInterpreter private AbstractSparkInterpreter delegation; + public SparkInterpreter(Properties properties) { super(properties); if (Boolean.parseBoolean(properties.getProperty("zeppelin.spark.useNew", "false"))) { @@ -76,8 +79,10 @@ public class SparkInterpreter extends AbstractSparkInterpreter { } @Override - public List<InterpreterCompletion> completion( - String buf, int cursor, InterpreterContext interpreterContext) throws InterpreterException { + public List<InterpreterCompletion> completion(String buf, + int cursor, + InterpreterContext interpreterContext) + throws InterpreterException { return delegation.completion(buf, cursor, interpreterContext); } @@ -95,6 +100,7 @@ public class SparkInterpreter extends AbstractSparkInterpreter { return delegation; } + @Override public SparkContext getSparkContext() { return delegation.getSparkContext(); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java index 337089d..8f55a87 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java @@ -17,14 +17,8 @@ package org.apache.zeppelin.spark; -import static org.apache.zeppelin.spark.ZeppelinRDisplay.render; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicBoolean; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.spark.SparkContext; import org.apache.spark.SparkRBackend; import org.apache.spark.api.java.JavaSparkContext; @@ -38,7 +32,18 @@ import org.apache.zeppelin.scheduler.SchedulerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** R and SparkR interpreter with visualization support. */ +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.zeppelin.spark.ZeppelinRDisplay.render; + +/** + * R and SparkR interpreter with visualization support. + */ public class SparkRInterpreter extends Interpreter { private static final Logger logger = LoggerFactory.getLogger(SparkRInterpreter.class); @@ -63,7 +68,7 @@ public class SparkRInterpreter extends Interpreter { if (System.getenv("SPARK_HOME") != null) { // local or yarn-client mode when SPARK_HOME is specified sparkRLibPath = System.getenv("SPARK_HOME") + "/R/lib"; - } else if (System.getenv("ZEPPELIN_HOME") != null) { + } else if (System.getenv("ZEPPELIN_HOME") != null){ // embedded mode when SPARK_HOME is not specified sparkRLibPath = System.getenv("ZEPPELIN_HOME") + "/interpreter/spark/R/lib"; // workaround to make sparkr work without SPARK_HOME @@ -98,8 +103,7 @@ public class SparkRInterpreter extends Interpreter { ZeppelinRContext.setSqlContext(sparkInterpreter.getSQLContext()); ZeppelinRContext.setZeppelinContext(sparkInterpreter.getZeppelinContext()); - zeppelinR = - new ZeppelinR(rCmdPath, sparkRLibPath, SparkRBackend.port(), sparkVersion, timeout, this); + zeppelinR = new ZeppelinR(rCmdPath, sparkRLibPath, SparkRBackend.port(), sparkVersion, timeout, this); try { zeppelinR.open(); } catch (IOException e) { @@ -109,11 +113,9 @@ public class SparkRInterpreter extends Interpreter { if (useKnitr()) { zeppelinR.eval("library('knitr')"); } - renderOptions = - getProperty( - "zeppelin.R.render.options", - "out.format = 'html', comment = NA, echo = FALSE, results = 'asis', message = F, " - + "warning = F, fig.retina = 2"); + renderOptions = getProperty("zeppelin.R.render.options", + "out.format = 'html', comment = NA, echo = FALSE, results = 'asis', message = F, " + + "warning = F, fig.retina = 2"); } @Override @@ -132,27 +134,26 @@ public class SparkRInterpreter extends Interpreter { String setJobGroup = ""; // assign setJobGroup to dummy__, otherwise it would print NULL for this statement if (isSpark2) { - setJobGroup = "dummy__ <- setJobGroup(\"" + jobGroup + "\", \" +" + jobDesc + "\", TRUE)"; + setJobGroup = "dummy__ <- setJobGroup(\"" + jobGroup + + "\", \" +" + jobDesc + "\", TRUE)"; } else { - setJobGroup = "dummy__ <- setJobGroup(sc, \"" + jobGroup + "\", \"" + jobDesc + "\", TRUE)"; + setJobGroup = "dummy__ <- setJobGroup(sc, \"" + jobGroup + + "\", \"" + jobDesc + "\", TRUE)"; } lines = setJobGroup + "\n" + lines; if (sparkInterpreter.getSparkVersion().newerThanEquals(SparkVersion.SPARK_2_3_0)) { // setLocalProperty is only available from spark 2.3.0 String setPoolStmt = "setLocalProperty('spark.scheduler.pool', NULL)"; if (interpreterContext.getLocalProperties().containsKey("pool")) { - setPoolStmt = - "setLocalProperty('spark.scheduler.pool', '" - + interpreterContext.getLocalProperties().get("pool") - + "')"; + setPoolStmt = "setLocalProperty('spark.scheduler.pool', '" + + interpreterContext.getLocalProperties().get("pool") + "')"; } lines = setPoolStmt + "\n" + lines; } try { // render output with knitr if (rbackendDead.get()) { - return new InterpreterResult( - InterpreterResult.Code.ERROR, + return new InterpreterResult(InterpreterResult.Code.ERROR, "sparkR backend is dead, please try to increase spark.r.backendConnectionTimeout"); } if (useKnitr()) { @@ -163,7 +164,11 @@ public class SparkRInterpreter extends Interpreter { RDisplay rDisplay = render(html, imageWidth); - return new InterpreterResult(rDisplay.code(), rDisplay.type(), rDisplay.content()); + return new InterpreterResult( + rDisplay.code(), + rDisplay.type(), + rDisplay.content() + ); } else { // alternatively, stream the output (without knitr) zeppelinR.setInterpreterOutput(interpreterContext.out); @@ -204,13 +209,13 @@ public class SparkRInterpreter extends Interpreter { @Override public Scheduler getScheduler() { - return SchedulerFactory.singleton() - .createOrGetFIFOScheduler(SparkRInterpreter.class.getName() + this.hashCode()); + return SchedulerFactory.singleton().createOrGetFIFOScheduler( + SparkRInterpreter.class.getName() + this.hashCode()); } @Override - public List<InterpreterCompletion> completion( - String buf, int cursor, InterpreterContext interpreterContext) { + public List<InterpreterCompletion> completion(String buf, int cursor, + InterpreterContext interpreterContext) { return new ArrayList<>(); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java index 811e5b7..04eb844 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java @@ -17,9 +17,6 @@ package org.apache.zeppelin.spark; -import java.lang.reflect.Method; -import java.util.List; -import java.util.Properties; import org.apache.spark.SparkContext; import org.apache.spark.sql.SQLContext; import org.apache.zeppelin.interpreter.Interpreter; @@ -33,7 +30,13 @@ import org.apache.zeppelin.scheduler.SchedulerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** Spark SQL interpreter for Zeppelin. */ +import java.lang.reflect.Method; +import java.util.List; +import java.util.Properties; + +/** + * Spark SQL interpreter for Zeppelin. + */ public class SparkSqlInterpreter extends Interpreter { private Logger logger = LoggerFactory.getLogger(SparkSqlInterpreter.class); @@ -59,9 +62,8 @@ public class SparkSqlInterpreter extends Interpreter { public InterpreterResult interpret(String st, InterpreterContext context) throws InterpreterException { if (sparkInterpreter.isUnsupportedSparkVersion()) { - return new InterpreterResult( - Code.ERROR, - "Spark " + sparkInterpreter.getSparkVersion().toString() + " is not supported"); + return new InterpreterResult(Code.ERROR, "Spark " + + sparkInterpreter.getSparkVersion().toString() + " is not supported"); } sparkInterpreter.getZeppelinContext().setInterpreterContext(context); @@ -71,13 +73,11 @@ public class SparkSqlInterpreter extends Interpreter { sc.setJobGroup(Utils.buildJobGroupId(context), Utils.buildJobDesc(context), false); try { - String effectiveSQL = - Boolean.parseBoolean(getProperty("zeppelin.spark.sql.interpolation")) - ? interpolate(st, context.getResourcePool()) - : st; + String effectiveSQL = Boolean.parseBoolean(getProperty("zeppelin.spark.sql.interpolation")) ? + interpolate(st, context.getResourcePool()) : st; Method method = sqlc.getClass().getMethod("sql", String.class); - String msg = - sparkInterpreter.getZeppelinContext().showData(method.invoke(sqlc, effectiveSQL)); + String msg = sparkInterpreter.getZeppelinContext().showData( + method.invoke(sqlc, effectiveSQL)); sc.clearJobGroup(); return new InterpreterResult(Code.SUCCESS, msg); } catch (Exception e) { @@ -85,8 +85,8 @@ public class SparkSqlInterpreter extends Interpreter { throw new InterpreterException(e); } logger.error("Invocation target exception", e); - String msg = - e.getMessage() + "\nset zeppelin.spark.sql.stacktrace = true to see full stacktrace"; + String msg = e.getMessage() + + "\nset zeppelin.spark.sql.stacktrace = true to see full stacktrace"; return new InterpreterResult(Code.ERROR, msg); } } @@ -102,6 +102,7 @@ public class SparkSqlInterpreter extends Interpreter { return FormType.SIMPLE; } + @Override public int getProgress(InterpreterContext context) throws InterpreterException { return sparkInterpreter.getProgress(context); @@ -111,9 +112,8 @@ public class SparkSqlInterpreter extends Interpreter { public Scheduler getScheduler() { if (concurrentSQL()) { int maxConcurrency = Integer.parseInt(getProperty("zeppelin.spark.concurrentSQL", "10")); - return SchedulerFactory.singleton() - .createOrGetParallelScheduler( - SparkSqlInterpreter.class.getName() + this.hashCode(), maxConcurrency); + return SchedulerFactory.singleton().createOrGetParallelScheduler( + SparkSqlInterpreter.class.getName() + this.hashCode(), maxConcurrency); } else { // getSparkInterpreter() calls open() inside. // That means if SparkInterpreter is not opened, it'll wait until SparkInterpreter open. @@ -131,8 +131,8 @@ public class SparkSqlInterpreter extends Interpreter { } @Override - public List<InterpreterCompletion> completion( - String buf, int cursor, InterpreterContext interpreterContext) { + public List<InterpreterCompletion> completion(String buf, int cursor, + InterpreterContext interpreterContext) { return null; } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java index 0805b5a..b75deb8 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java @@ -19,7 +19,9 @@ package org.apache.zeppelin.spark; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** Provide reading comparing capability of spark version returned from SparkContext.version() */ +/** + * Provide reading comparing capability of spark version returned from SparkContext.version() + */ public class SparkVersion { private static final Logger logger = LoggerFactory.getLogger(SparkVersion.class); @@ -30,7 +32,7 @@ public class SparkVersion { public static final SparkVersion SPARK_2_3_1 = SparkVersion.fromVersionString("2.3.1"); public static final SparkVersion SPARK_2_4_0 = SparkVersion.fromVersionString("2.4.0"); - public static final SparkVersion MIN_SUPPORTED_VERSION = SPARK_1_6_0; + public static final SparkVersion MIN_SUPPORTED_VERSION = SPARK_1_6_0; public static final SparkVersion UNSUPPORTED_FUTURE_VERSION = SPARK_2_4_0; private int version; @@ -54,8 +56,8 @@ public class SparkVersion { // version is always 5 digits. (e.g. 2.0.0 -> 20000, 1.6.2 -> 10602) version = Integer.parseInt(String.format("%d%02d%02d", major, minor, patch)); } catch (Exception e) { - logger.error( - "Can not recognize Spark version " + versionString + ". Assume it's a future release", e); + logger.error("Can not recognize Spark version " + versionString + + ". Assume it's a future release", e); // assume it is future release version = 99999; http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java index 9e7f3db..cd6c607 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java @@ -17,23 +17,26 @@ package org.apache.zeppelin.spark; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.user.AuthenticationInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.util.Properties; import java.util.regex.Matcher; import java.util.regex.Pattern; -import org.apache.zeppelin.interpreter.InterpreterContext; -import org.apache.zeppelin.user.AuthenticationInfo; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -/** Utility and helper functions for the Spark Interpreter */ +/** + * Utility and helper functions for the Spark Interpreter + */ class Utils { public static Logger logger = LoggerFactory.getLogger(Utils.class); private static final String SCALA_COMPILER_VERSION = evaluateScalaCompilerVersion(); static Object invokeMethod(Object o, String name) { - return invokeMethod(o, name, new Class[] {}, new Object[] {}); + return invokeMethod(o, name, new Class[]{}, new Object[]{}); } static Object invokeMethod(Object o, String name, Class<?>[] argTypes, Object[] params) { @@ -55,7 +58,7 @@ class Utils { } static Object invokeStaticMethod(Class<?> c, String name) { - return invokeStaticMethod(c, name, new Class[] {}, new Object[] {}); + return invokeStaticMethod(c, name, new Class[]{}, new Object[]{}); } static Class<?> findClass(String name) { @@ -75,14 +78,11 @@ class Utils { static Object instantiateClass(String name, Class<?>[] argTypes, Object[] params) { try { - Constructor<?> constructor = - Utils.class.getClassLoader().loadClass(name).getConstructor(argTypes); + Constructor<?> constructor = Utils.class.getClassLoader() + .loadClass(name).getConstructor(argTypes); return constructor.newInstance(params); - } catch (NoSuchMethodException - | ClassNotFoundException - | IllegalAccessException - | InstantiationException - | InvocationTargetException e) { + } catch (NoSuchMethodException | ClassNotFoundException | IllegalAccessException | + InstantiationException | InvocationTargetException e) { logger.error(e.getMessage(), e); } return null; @@ -103,7 +103,7 @@ class Utils { static boolean isScala2_11() { return !isScala2_10(); } - + static boolean isCompilerAboveScala2_11_7() { if (isScala2_10() || SCALA_COMPILER_VERSION == null) { return false; @@ -125,8 +125,8 @@ class Utils { Properties p = new Properties(); Class<?> completionClass = findClass("scala.tools.nsc.interpreter.JLineCompletion"); if (completionClass != null) { - try (java.io.InputStream in = - completionClass.getClass().getResourceAsStream("/compiler.properties")) { + try (java.io.InputStream in = completionClass.getClass() + .getResourceAsStream("/compiler.properties")) { p.load(in); version = p.getProperty("version.number"); } catch (java.io.IOException e) { @@ -147,7 +147,7 @@ class Utils { return false; } } - + public static String buildJobGroupId(InterpreterContext context) { return "zeppelin-" + context.getNoteId() + "-" + context.getParagraphId(); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java index b743641..71f3568 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java @@ -16,21 +16,26 @@ */ package org.apache.zeppelin.spark; -import java.io.*; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; import org.apache.commons.exec.*; import org.apache.commons.exec.environment.EnvironmentUtils; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkRBackend; import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterOutput; +import org.apache.zeppelin.interpreter.InterpreterOutputListener; +import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput; import org.apache.zeppelin.interpreter.util.InterpreterOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** R repl interaction */ +import java.io.*; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * R repl interaction + */ public class ZeppelinR implements ExecuteResultHandler { private static Logger logger = LoggerFactory.getLogger(ZeppelinR.class); @@ -43,28 +48,32 @@ public class ZeppelinR implements ExecuteResultHandler { private PipedOutputStream input; private final String scriptPath; private final String libPath; - static Map<Integer, ZeppelinR> zeppelinR = - Collections.synchronizedMap(new HashMap<Integer, ZeppelinR>()); + static Map<Integer, ZeppelinR> zeppelinR = Collections.synchronizedMap( + new HashMap<Integer, ZeppelinR>()); private InterpreterOutput initialOutput; private final int port; private boolean rScriptRunning; - /** To be notified R repl initialization */ + /** + * To be notified R repl initialization + */ boolean rScriptInitialized = false; - Integer rScriptInitializeNotifier = new Integer(0); - /** Request to R repl */ + /** + * Request to R repl + */ Request rRequestObject = null; - Integer rRequestNotifier = new Integer(0); /** * Request object * - * <p>type : "eval", "set", "get" stmt : statement to evaluate when type is "eval" key when type - * is "set" or "get" value : value object when type is "put" + * type : "eval", "set", "get" + * stmt : statement to evaluate when type is "eval" + * key when type is "set" or "get" + * value : value object when type is "put" */ public static class Request { String type; @@ -90,25 +99,20 @@ public class ZeppelinR implements ExecuteResultHandler { } } - /** Response from R repl */ + /** + * Response from R repl + */ Object rResponseValue = null; - boolean rResponseError = false; Integer rResponseNotifier = new Integer(0); /** * Create ZeppelinR instance - * * @param rCmdPath R repl commandline path * @param libPath sparkr library path */ - public ZeppelinR( - String rCmdPath, - String libPath, - int sparkRBackendPort, - SparkVersion sparkVersion, - int timeout, - SparkRInterpreter sparkRInterpreter) { + public ZeppelinR(String rCmdPath, String libPath, int sparkRBackendPort, + SparkVersion sparkVersion, int timeout, SparkRInterpreter sparkRInterpreter) { this.rCmdPath = rCmdPath; this.libPath = libPath; this.sparkVersion = sparkVersion; @@ -125,7 +129,6 @@ public class ZeppelinR implements ExecuteResultHandler { /** * Start R repl - * * @throws IOException */ public void open() throws IOException, InterpreterException { @@ -161,6 +164,7 @@ public class ZeppelinR implements ExecuteResultHandler { executor.setStreamHandler(streamHandler); Map env = EnvironmentUtils.getProcEnvironment(); + initialOutput = new InterpreterOutput(null); outputStream.setInterpreterOutput(initialOutput); executor.execute(cmd, env, this); @@ -172,7 +176,6 @@ public class ZeppelinR implements ExecuteResultHandler { /** * Evaluate expression - * * @param expr * @return */ @@ -185,7 +188,6 @@ public class ZeppelinR implements ExecuteResultHandler { /** * assign value to key - * * @param key * @param value */ @@ -198,7 +200,6 @@ public class ZeppelinR implements ExecuteResultHandler { /** * get value of key - * * @param key * @return */ @@ -211,7 +212,6 @@ public class ZeppelinR implements ExecuteResultHandler { /** * get value of key, as a string - * * @param key * @return */ @@ -224,7 +224,6 @@ public class ZeppelinR implements ExecuteResultHandler { /** * Send request to r repl and return response - * * @return responseValue */ private Object request() throws RuntimeException, InterpreterException { @@ -264,16 +263,17 @@ public class ZeppelinR implements ExecuteResultHandler { } /** - * Wait until src/main/resources/R/zeppelin_sparkr.R is initialized and call onScriptInitialized() + * Wait until src/main/resources/R/zeppelin_sparkr.R is initialized + * and call onScriptInitialized() * * @throws InterpreterException */ private void waitForRScriptInitialized() throws InterpreterException { synchronized (rScriptInitializeNotifier) { long startTime = System.nanoTime(); - while (rScriptInitialized == false - && rScriptRunning - && System.nanoTime() - startTime < 10L * 1000 * 1000000) { + while (rScriptInitialized == false && + rScriptRunning && + System.nanoTime() - startTime < 10L * 1000 * 1000000) { try { rScriptInitializeNotifier.wait(1000); } catch (InterruptedException e) { @@ -297,7 +297,6 @@ public class ZeppelinR implements ExecuteResultHandler { /** * invoked by src/main/resources/R/zeppelin_sparkr.R - * * @return */ public Request getRequest() { @@ -318,7 +317,6 @@ public class ZeppelinR implements ExecuteResultHandler { /** * invoked by src/main/resources/R/zeppelin_sparkr.R - * * @param value * @param error */ @@ -330,7 +328,9 @@ public class ZeppelinR implements ExecuteResultHandler { } } - /** invoked by src/main/resources/R/zeppelin_sparkr.R */ + /** + * invoked by src/main/resources/R/zeppelin_sparkr.R + */ public void onScriptInitialized() { synchronized (rScriptInitializeNotifier) { rScriptInitialized = true; @@ -338,7 +338,9 @@ public class ZeppelinR implements ExecuteResultHandler { } } - /** Create R script in tmp dir */ + /** + * Create R script in tmp dir + */ private void createRScript() throws InterpreterException { ClassLoader classLoader = getClass().getClassLoader(); File out = new File(scriptPath); @@ -349,7 +351,9 @@ public class ZeppelinR implements ExecuteResultHandler { try { FileOutputStream outStream = new FileOutputStream(out); - IOUtils.copy(classLoader.getResourceAsStream("R/zeppelin_sparkr.R"), outStream); + IOUtils.copy( + classLoader.getResourceAsStream("R/zeppelin_sparkr.R"), + outStream); outStream.close(); } catch (IOException e) { throw new InterpreterException(e); @@ -358,7 +362,9 @@ public class ZeppelinR implements ExecuteResultHandler { logger.info("File {} created", scriptPath); } - /** Terminate this R repl */ + /** + * Terminate this R repl + */ public void close() { executor.getWatchdog().destroyProcess(); new File(scriptPath).delete(); @@ -366,8 +372,8 @@ public class ZeppelinR implements ExecuteResultHandler { } /** - * Get instance This method will be invoded from zeppelin_sparkr.R - * + * Get instance + * This method will be invoded from zeppelin_sparkr.R * @param hashcode * @return */ @@ -377,7 +383,6 @@ public class ZeppelinR implements ExecuteResultHandler { /** * Pass InterpreterOutput to capture the repl output - * * @param out */ public void setInterpreterOutput(InterpreterOutput out) { @@ -396,6 +401,7 @@ public class ZeppelinR implements ExecuteResultHandler { rScriptRunning = false; } + public static class SparkRInterpreterOutputStream extends InterpreterOutputStream { private SparkRInterpreter sparkRInterpreter; http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java index a6c56fa..80ea03b 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java @@ -21,7 +21,9 @@ import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SQLContext; -/** Contains the Spark and Zeppelin Contexts made available to SparkR. */ +/** + * Contains the Spark and Zeppelin Contexts made available to SparkR. + */ public class ZeppelinRContext { private static SparkContext sparkContext; private static SQLContext sqlContext; @@ -61,11 +63,7 @@ public class ZeppelinRContext { return sparkSession; } - public static void setJavaSparkContext(JavaSparkContext jsc) { - javaSparkContext = jsc; - } + public static void setJavaSparkContext(JavaSparkContext jsc) { javaSparkContext = jsc; } - public static JavaSparkContext getJavaSparkContext() { - return javaSparkContext; - } + public static JavaSparkContext getJavaSparkContext() { return javaSparkContext; } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/spark/interpreter/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyContext.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyContext.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyContext.java index b5ace31..0235fc6 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyContext.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyContext.java @@ -21,16 +21,18 @@ import java.io.File; import java.net.MalformedURLException; import java.util.LinkedList; import java.util.List; + import org.apache.zeppelin.dep.Booter; import org.apache.zeppelin.dep.Dependency; import org.apache.zeppelin.dep.Repository; + import org.sonatype.aether.RepositorySystem; import org.sonatype.aether.RepositorySystemSession; import org.sonatype.aether.artifact.Artifact; import org.sonatype.aether.collection.CollectRequest; import org.sonatype.aether.graph.DependencyFilter; -import org.sonatype.aether.repository.Authentication; import org.sonatype.aether.repository.RemoteRepository; +import org.sonatype.aether.repository.Authentication; import org.sonatype.aether.resolution.ArtifactResolutionException; import org.sonatype.aether.resolution.ArtifactResult; import org.sonatype.aether.resolution.DependencyRequest; @@ -40,7 +42,10 @@ import org.sonatype.aether.util.artifact.JavaScopes; import org.sonatype.aether.util.filter.DependencyFilterUtils; import org.sonatype.aether.util.filter.PatternExclusionsDependencyFilter; -/** */ + +/** + * + */ public class SparkDependencyContext { List<Dependency> dependencies = new LinkedList<>(); List<Repository> repositories = new LinkedList<>(); @@ -54,7 +59,7 @@ public class SparkDependencyContext { private List<RemoteRepository> additionalRepos = new LinkedList<>(); public SparkDependencyContext(String localRepoPath, String additionalRemoteRepository) { - session = Booter.newRepositorySystemSession(system, localRepoPath); + session = Booter.newRepositorySystemSession(system, localRepoPath); addRepoFromProperty(additionalRemoteRepository); } @@ -103,14 +108,13 @@ public class SparkDependencyContext { /** * fetch all artifacts - * * @return * @throws MalformedURLException * @throws ArtifactResolutionException * @throws DependencyResolutionException */ - public List<File> fetch() - throws MalformedURLException, DependencyResolutionException, ArtifactResolutionException { + public List<File> fetch() throws MalformedURLException, + DependencyResolutionException, ArtifactResolutionException { for (Dependency dep : dependencies) { if (!dep.isLocalFsArtifact()) { @@ -134,17 +138,17 @@ public class SparkDependencyContext { private List<ArtifactResult> fetchArtifactWithDep(Dependency dep) throws DependencyResolutionException, ArtifactResolutionException { - Artifact artifact = - new DefaultArtifact( - SparkDependencyResolver.inferScalaVersion(dep.getGroupArtifactVersion())); + Artifact artifact = new DefaultArtifact( + SparkDependencyResolver.inferScalaVersion(dep.getGroupArtifactVersion())); - DependencyFilter classpathFlter = DependencyFilterUtils.classpathFilter(JavaScopes.COMPILE); - PatternExclusionsDependencyFilter exclusionFilter = - new PatternExclusionsDependencyFilter( - SparkDependencyResolver.inferScalaVersion(dep.getExclusions())); + DependencyFilter classpathFlter = DependencyFilterUtils + .classpathFilter(JavaScopes.COMPILE); + PatternExclusionsDependencyFilter exclusionFilter = new PatternExclusionsDependencyFilter( + SparkDependencyResolver.inferScalaVersion(dep.getExclusions())); CollectRequest collectRequest = new CollectRequest(); - collectRequest.setRoot(new org.sonatype.aether.graph.Dependency(artifact, JavaScopes.COMPILE)); + collectRequest.setRoot(new org.sonatype.aether.graph.Dependency(artifact, + JavaScopes.COMPILE)); collectRequest.addRepository(mavenCentral); collectRequest.addRepository(mavenLocal); @@ -161,9 +165,8 @@ public class SparkDependencyContext { collectRequest.addRepository(rr); } - DependencyRequest dependencyRequest = - new DependencyRequest( - collectRequest, DependencyFilterUtils.andFilter(exclusionFilter, classpathFlter)); + DependencyRequest dependencyRequest = new DependencyRequest(collectRequest, + DependencyFilterUtils.andFilter(exclusionFilter, classpathFlter)); return system.resolveDependencies(session, dependencyRequest).getArtifactResults(); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/spark/interpreter/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyResolver.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyResolver.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyResolver.java index df10111..46224a8 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyResolver.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyResolver.java @@ -26,6 +26,7 @@ import java.util.Collection; import java.util.Iterator; import java.util.LinkedList; import java.util.List; + import org.apache.commons.lang.StringUtils; import org.apache.spark.SparkContext; import org.apache.zeppelin.dep.AbstractDependencyResolver; @@ -42,6 +43,7 @@ import org.sonatype.aether.util.artifact.DefaultArtifact; import org.sonatype.aether.util.artifact.JavaScopes; import org.sonatype.aether.util.filter.DependencyFilterUtils; import org.sonatype.aether.util.filter.PatternExclusionsDependencyFilter; + import scala.Some; import scala.collection.IndexedSeq; import scala.reflect.io.AbstractFile; @@ -50,30 +52,29 @@ import scala.tools.nsc.backend.JavaPlatform; import scala.tools.nsc.util.ClassPath; import scala.tools.nsc.util.MergedClassPath; -/** Deps resolver. Add new dependencies from mvn repo (at runtime) to Spark interpreter group. */ +/** + * Deps resolver. + * Add new dependencies from mvn repo (at runtime) to Spark interpreter group. + */ public class SparkDependencyResolver extends AbstractDependencyResolver { Logger logger = LoggerFactory.getLogger(SparkDependencyResolver.class); private Global global; private ClassLoader runtimeClassLoader; private SparkContext sc; - private final String[] exclusions = - new String[] { - "org.scala-lang:scala-library", - "org.scala-lang:scala-compiler", - "org.scala-lang:scala-reflect", - "org.scala-lang:scalap", - "org.apache.zeppelin:zeppelin-zengine", - "org.apache.zeppelin:zeppelin-spark", - "org.apache.zeppelin:zeppelin-server" - }; - - public SparkDependencyResolver( - Global global, - ClassLoader runtimeClassLoader, - SparkContext sc, - String localRepoPath, - String additionalRemoteRepository) { + private final String[] exclusions = new String[] {"org.scala-lang:scala-library", + "org.scala-lang:scala-compiler", + "org.scala-lang:scala-reflect", + "org.scala-lang:scalap", + "org.apache.zeppelin:zeppelin-zengine", + "org.apache.zeppelin:zeppelin-spark", + "org.apache.zeppelin:zeppelin-server"}; + + public SparkDependencyResolver(Global global, + ClassLoader runtimeClassLoader, + SparkContext sc, + String localRepoPath, + String additionalRemoteRepository) { super(localRepoPath); this.global = global; this.runtimeClassLoader = runtimeClassLoader; @@ -98,8 +99,8 @@ public class SparkDependencyResolver extends AbstractDependencyResolver { } } - private void updateCompilerClassPath(URL[] urls) - throws IllegalAccessException, IllegalArgumentException, InvocationTargetException { + private void updateCompilerClassPath(URL[] urls) throws IllegalAccessException, + IllegalArgumentException, InvocationTargetException { JavaPlatform platform = (JavaPlatform) global.platform(); MergedClassPath<AbstractFile> newClassPath = mergeUrlsIntoClassPath(platform, urls); @@ -119,15 +120,15 @@ public class SparkDependencyResolver extends AbstractDependencyResolver { } // Reload all jars specified into our compiler - global.invalidateClassPathEntries( - scala.collection.JavaConversions.asScalaBuffer(classPaths).toList()); + global.invalidateClassPathEntries(scala.collection.JavaConversions.asScalaBuffer(classPaths) + .toList()); } // Until spark 1.1.x // check https://github.com/apache/spark/commit/191d7cf2a655d032f160b9fa181730364681d0e7 - private void updateRuntimeClassPath_1_x(URL[] urls) - throws SecurityException, IllegalAccessException, IllegalArgumentException, - InvocationTargetException, NoSuchMethodException { + private void updateRuntimeClassPath_1_x(URL[] urls) throws SecurityException, + IllegalAccessException, IllegalArgumentException, + InvocationTargetException, NoSuchMethodException { Method addURL; addURL = runtimeClassLoader.getClass().getDeclaredMethod("addURL", new Class[] {URL.class}); addURL.setAccessible(true); @@ -136,9 +137,9 @@ public class SparkDependencyResolver extends AbstractDependencyResolver { } } - private void updateRuntimeClassPath_2_x(URL[] urls) - throws SecurityException, IllegalAccessException, IllegalArgumentException, - InvocationTargetException, NoSuchMethodException { + private void updateRuntimeClassPath_2_x(URL[] urls) throws SecurityException, + IllegalAccessException, IllegalArgumentException, + InvocationTargetException, NoSuchMethodException { Method addURL; addURL = runtimeClassLoader.getClass().getDeclaredMethod("addNewUrl", new Class[] {URL.class}); addURL.setAccessible(true); @@ -177,17 +178,17 @@ public class SparkDependencyResolver extends AbstractDependencyResolver { } } - return new MergedClassPath( - scala.collection.JavaConversions.asScalaBuffer(cp).toIndexedSeq(), + return new MergedClassPath(scala.collection.JavaConversions.asScalaBuffer(cp).toIndexedSeq(), platform.classPath().context()); } - public List<String> load(String artifact, boolean addSparkContext) throws Exception { + public List<String> load(String artifact, + boolean addSparkContext) throws Exception { return load(artifact, new LinkedList<String>(), addSparkContext); } - public List<String> load(String artifact, Collection<String> excludes, boolean addSparkContext) - throws Exception { + public List<String> load(String artifact, Collection<String> excludes, + boolean addSparkContext) throws Exception { if (StringUtils.isBlank(artifact)) { // Should throw here throw new RuntimeException("Invalid artifact to load"); @@ -221,8 +222,8 @@ public class SparkDependencyResolver extends AbstractDependencyResolver { } } - private List<String> loadFromMvn( - String artifact, Collection<String> excludes, boolean addSparkContext) throws Exception { + private List<String> loadFromMvn(String artifact, Collection<String> excludes, + boolean addSparkContext) throws Exception { List<String> loadedLibs = new LinkedList<>(); Collection<String> allExclusions = new LinkedList<>(); allExclusions.addAll(excludes); @@ -246,21 +247,14 @@ public class SparkDependencyResolver extends AbstractDependencyResolver { List<URL> newClassPathList = new LinkedList<>(); List<File> files = new LinkedList<>(); for (ArtifactResult artifactResult : listOfArtifact) { - logger.info( - "Load " - + artifactResult.getArtifact().getGroupId() - + ":" - + artifactResult.getArtifact().getArtifactId() - + ":" - + artifactResult.getArtifact().getVersion()); + logger.info("Load " + artifactResult.getArtifact().getGroupId() + ":" + + artifactResult.getArtifact().getArtifactId() + ":" + + artifactResult.getArtifact().getVersion()); newClassPathList.add(artifactResult.getArtifact().getFile().toURI().toURL()); files.add(artifactResult.getArtifact().getFile()); - loadedLibs.add( - artifactResult.getArtifact().getGroupId() - + ":" - + artifactResult.getArtifact().getArtifactId() - + ":" - + artifactResult.getArtifact().getVersion()); + loadedLibs.add(artifactResult.getArtifact().getGroupId() + ":" + + artifactResult.getArtifact().getArtifactId() + ":" + + artifactResult.getArtifact().getVersion()); } global.new Run(); @@ -287,8 +281,8 @@ public class SparkDependencyResolver extends AbstractDependencyResolver { * @throws Exception */ @Override - public List<ArtifactResult> getArtifactsWithDep(String dependency, Collection<String> excludes) - throws Exception { + public List<ArtifactResult> getArtifactsWithDep(String dependency, + Collection<String> excludes) throws Exception { Artifact artifact = new DefaultArtifact(inferScalaVersion(dependency)); DependencyFilter classpathFilter = DependencyFilterUtils.classpathFilter(JavaScopes.COMPILE); PatternExclusionsDependencyFilter exclusionFilter = @@ -302,9 +296,8 @@ public class SparkDependencyResolver extends AbstractDependencyResolver { collectRequest.addRepository(repo); } } - DependencyRequest dependencyRequest = - new DependencyRequest( - collectRequest, DependencyFilterUtils.andFilter(exclusionFilter, classpathFilter)); + DependencyRequest dependencyRequest = new DependencyRequest(collectRequest, + DependencyFilterUtils.andFilter(exclusionFilter, classpathFilter)); return system.resolveDependencies(session, dependencyRequest).getArtifactResults(); } @@ -347,7 +340,7 @@ public class SparkDependencyResolver extends AbstractDependencyResolver { } } - String[] version = scala.util.Properties.versionNumberString().split("[.]"); + String [] version = scala.util.Properties.versionNumberString().split("[.]"); String scalaVersion = version[0] + "." + version[1]; return groupId + ":" + artifactId + "_" + scalaVersion + versionSep + restOfthem; http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/spark/interpreter/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java index 3137ce7..104a675 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java @@ -17,11 +17,6 @@ package org.apache.zeppelin.spark; -import static org.junit.Assert.assertEquals; - -import java.io.IOException; -import java.util.LinkedList; -import java.util.Properties; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; @@ -34,9 +29,16 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import java.io.IOException; +import java.util.LinkedList; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; + public class DepInterpreterTest { - @Rule public TemporaryFolder tmpDir = new TemporaryFolder(); + @Rule + public TemporaryFolder tmpDir = new TemporaryFolder(); private DepInterpreter dep; private InterpreterContext context; @@ -44,9 +46,7 @@ public class DepInterpreterTest { private Properties getTestProperties() throws IOException { Properties p = new Properties(); p.setProperty("zeppelin.dep.localrepo", tmpDir.newFolder().getAbsolutePath()); - p.setProperty( - "zeppelin.dep.additionalRemoteRepository", - "spark-packages,http://dl.bintray.com/spark-packages/maven,false;"); + p.setProperty("zeppelin.dep.additionalRemoteRepository", "spark-packages,http://dl.bintray.com/spark-packages/maven,false;"); return p; } @@ -63,8 +63,8 @@ public class DepInterpreterTest { intpGroup.get("note").add(dep); dep.setInterpreterGroup(intpGroup); - context = InterpreterContext.builder().build(); - ; + context = InterpreterContext.builder() + .build();; } @After @@ -75,8 +75,7 @@ public class DepInterpreterTest { @Test public void testDefault() throws InterpreterException { dep.getDependencyContext().reset(); - InterpreterResult ret = - dep.interpret("z.load(\"org.apache.commons:commons-csv:1.1\")", context); + InterpreterResult ret = dep.interpret("z.load(\"org.apache.commons:commons-csv:1.1\")", context); assertEquals(Code.SUCCESS, ret.code()); assertEquals(1, dep.getDependencyContext().getFiles().size());
