http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ca87f7d4/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala ---------------------------------------------------------------------- diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala new file mode 100644 index 0000000..3ef4fe7 --- /dev/null +++ b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.spark + + +import java.io.File + +import org.apache.spark.sql.SQLContext +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion +import org.apache.zeppelin.interpreter.util.InterpreterOutputStream +import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterResult} +import org.slf4j.{Logger, LoggerFactory} + +import scala.collection.JavaConverters._ +import scala.tools.nsc.interpreter.Completion.ScalaCompleter +import scala.util.control.NonFatal + +/** + * Base class for different scala versions of SparkInterpreter. It should be + * binary compatible between multiple scala versions. + * @param conf + * @param depFiles + */ +abstract class BaseSparkScalaInterpreter(val conf: SparkConf, + val depFiles: java.util.List[String]) { + + protected lazy val LOGGER: Logger = LoggerFactory.getLogger(getClass) + + private val isTest = conf.getBoolean("zeppelin.spark.test", false) + + protected var sc: SparkContext = _ + + protected var sqlContext: SQLContext = _ + + protected var sparkSession: Object = _ + + protected var sparkHttpServer: Object = _ + + protected var sparkUrl: String = _ + + protected var scalaCompleter: ScalaCompleter = _ + + protected val interpreterOutput: InterpreterOutputStream + + protected def open(): Unit = { + /* Required for scoped mode. + * In scoped mode multiple scala compiler (repl) generates class in the same directory. + * Class names is not randomly generated and look like '$line12.$read$$iw$$iw' + * Therefore it's possible to generated class conflict(overwrite) with other repl generated + * class. + * + * To prevent generated class name conflict, + * change prefix of generated class name from each scala compiler (repl) instance. + * + * In Spark 2.x, REPL generated wrapper class name should compatible with the pattern + * ^(\$line(?:\d+)\.\$read)(?:\$\$iw)+$ + * + * As hashCode() can return a negative integer value and the minus character '-' is invalid + * in a package name we change it to a numeric value '0' which still conforms to the regexp. + * + */ + System.setProperty("scala.repl.name.line", ("$line" + this.hashCode).replace('-', '0')) + } + + protected def interpret(code: String, context: InterpreterContext): InterpreterResult + + protected def interpret(code: String): InterpreterResult = interpret(code, null) + + protected def scalaInterpret(code: String): scala.tools.nsc.interpreter.IR.Result + + protected def completion(buf: String, + cursor: Int, + context: InterpreterContext): java.util.List[InterpreterCompletion] = { + val completions = scalaCompleter.complete(buf, cursor).candidates + .map(e => new InterpreterCompletion(e, e, null)) + scala.collection.JavaConversions.seqAsJavaList(completions) + } + + protected def getProgress(jobGroup: String, context: InterpreterContext): Int = { + val jobIds = sc.statusTracker.getJobIdsForGroup(jobGroup) + val jobs = jobIds.flatMap { id => sc.statusTracker.getJobInfo(id) } + val stages = jobs.flatMap { job => + job.stageIds().flatMap(sc.statusTracker.getStageInfo) + } + + val taskCount = stages.map(_.numTasks).sum + val completedTaskCount = stages.map(_.numCompletedTasks).sum + if (taskCount == 0) { + 0 + } else { + (100 * completedTaskCount.toDouble / taskCount).toInt + } + } + + protected def bind(name: String, tpe: String, value: Object, modifier: List[String]): Unit + + // for use in java side + protected def bind(name: String, + tpe: String, + value: Object, + modifier: java.util.List[String]): Unit = + bind(name, tpe, value, modifier.asScala.toList) + + protected def close(): Unit = { + if (sc != null) { + sc.stop() + } + if (sparkHttpServer != null) { + sparkHttpServer.getClass.getMethod("stop").invoke(sparkHttpServer) + } + sc = null + sqlContext = null + if (sparkSession != null) { + sparkSession.getClass.getMethod("stop").invoke(sparkSession) + sparkSession = null + } + + } + + protected def createSparkContext(): Unit = { + if (isSparkSessionPresent()) { + spark2CreateContext() + } else { + spark1CreateContext() + } + } + + private def spark1CreateContext(): Unit = { + this.sc = SparkContext.getOrCreate(conf) + if (!isTest) { + interpreterOutput.write("Created SparkContext.\n".getBytes()) + } + getUserFiles().foreach(file => sc.addFile(file)) + + sc.getClass.getMethod("ui").invoke(sc).asInstanceOf[Option[_]] match { + case Some(webui) => + sparkUrl = webui.getClass.getMethod("appUIAddress").invoke(webui).asInstanceOf[String] + case None => + } + + val hiveSiteExisted: Boolean = + Thread.currentThread().getContextClassLoader.getResource("hive-site.xml") != null + val hiveEnabled = conf.getBoolean("spark.useHiveContext", false) + if (hiveEnabled && hiveSiteExisted) { + sqlContext = Class.forName("org.apache.spark.sql.hive.HiveContext") + .getConstructor(classOf[SparkContext]).newInstance(sc).asInstanceOf[SQLContext] + if (!isTest) { + interpreterOutput.write("Created sql context (with Hive support).\n".getBytes()) + } + } else { + if (hiveEnabled && !hiveSiteExisted && !isTest) { + interpreterOutput.write(("spark.useHiveContext is set as true but no hive-site.xml" + + " is found in classpath, so zeppelin will fallback to SQLContext.\n").getBytes()) + } + sqlContext = Class.forName("org.apache.spark.sql.SQLContext") + .getConstructor(classOf[SparkContext]).newInstance(sc).asInstanceOf[SQLContext] + if (!isTest) { + interpreterOutput.write("Created sql context.\n".getBytes()) + } + } + + bind("sc", "org.apache.spark.SparkContext", sc, List("""@transient""")) + bind("sqlContext", sqlContext.getClass.getCanonicalName, sqlContext, List("""@transient""")) + + interpret("import org.apache.spark.SparkContext._") + interpret("import sqlContext.implicits._") + interpret("import sqlContext.sql") + interpret("import org.apache.spark.sql.functions._") + } + + private def spark2CreateContext(): Unit = { + val sparkClz = Class.forName("org.apache.spark.sql.SparkSession$") + val sparkObj = sparkClz.getField("MODULE$").get(null) + + val builderMethod = sparkClz.getMethod("builder") + val builder = builderMethod.invoke(sparkObj) + builder.getClass.getMethod("config", classOf[SparkConf]).invoke(builder, conf) + + if (conf.get("spark.sql.catalogImplementation", "in-memory").toLowerCase == "hive" + || conf.get("spark.useHiveContext", "false").toLowerCase == "true") { + val hiveSiteExisted: Boolean = + Thread.currentThread().getContextClassLoader.getResource("hive-site.xml") != null + val hiveClassesPresent = + sparkClz.getMethod("hiveClassesArePresent").invoke(sparkObj).asInstanceOf[Boolean] + if (hiveSiteExisted && hiveClassesPresent) { + builder.getClass.getMethod("enableHiveSupport").invoke(builder) + sparkSession = builder.getClass.getMethod("getOrCreate").invoke(builder) + if (!isTest) { + interpreterOutput.write("Created Spark session (with Hive support).\n".getBytes()) + } + } else { + if (!hiveClassesPresent && !isTest) { + interpreterOutput.write( + "Hive support can not be enabled because spark is not built with hive\n".getBytes) + } + if (!hiveSiteExisted && !isTest) { + interpreterOutput.write( + "Hive support can not be enabled because no hive-site.xml found\n".getBytes) + } + sparkSession = builder.getClass.getMethod("getOrCreate").invoke(builder) + if (!isTest) { + interpreterOutput.write("Created Spark session.\n".getBytes()) + } + } + } else { + sparkSession = builder.getClass.getMethod("getOrCreate").invoke(builder) + if (!isTest) { + interpreterOutput.write("Created Spark session.\n".getBytes()) + } + } + + sc = sparkSession.getClass.getMethod("sparkContext").invoke(sparkSession) + .asInstanceOf[SparkContext] + getUserFiles().foreach(file => sc.addFile(file)) + sqlContext = sparkSession.getClass.getMethod("sqlContext").invoke(sparkSession) + .asInstanceOf[SQLContext] + sc.getClass.getMethod("uiWebUrl").invoke(sc).asInstanceOf[Option[String]] match { + case Some(url) => sparkUrl = url + case None => + } + + bind("spark", sparkSession.getClass.getCanonicalName, sparkSession, List("""@transient""")) + bind("sc", "org.apache.spark.SparkContext", sc, List("""@transient""")) + bind("sqlContext", "org.apache.spark.sql.SQLContext", sqlContext, List("""@transient""")) + + interpret("import org.apache.spark.SparkContext._") + interpret("import spark.implicits._") + interpret("import spark.sql") + interpret("import org.apache.spark.sql.functions._") + } + + private def isSparkSessionPresent(): Boolean = { + try { + Class.forName("org.apache.spark.sql.SparkSession") + true + } catch { + case _: ClassNotFoundException | _: NoClassDefFoundError => false + } + } + + protected def getField(obj: Object, name: String): Object = { + val field = obj.getClass.getField(name) + field.setAccessible(true) + field.get(obj) + } + + protected def getDeclareField(obj: Object, name: String): Object = { + val field = obj.getClass.getDeclaredField(name) + field.setAccessible(true) + field.get(obj) + } + + protected def setDeclaredField(obj: Object, name: String, value: Object): Unit = { + val field = obj.getClass.getDeclaredField(name) + field.setAccessible(true) + field.set(obj, value) + } + + protected def callMethod(obj: Object, name: String): Object = { + callMethod(obj, name, Array.empty[Class[_]], Array.empty[Object]) + } + + protected def callMethod(obj: Object, name: String, + parameterTypes: Array[Class[_]], + parameters: Array[Object]): Object = { + val method = obj.getClass.getMethod(name, parameterTypes: _ *) + method.setAccessible(true) + method.invoke(obj, parameters: _ *) + } + + protected def startHttpServer(outputDir: File): Option[(Object, String)] = { + try { + val httpServerClass = Class.forName("org.apache.spark.HttpServer") + val securityManager = { + val constructor = Class.forName("org.apache.spark.SecurityManager") + .getConstructor(classOf[SparkConf]) + constructor.setAccessible(true) + constructor.newInstance(conf).asInstanceOf[Object] + } + val httpServerConstructor = httpServerClass + .getConstructor(classOf[SparkConf], + classOf[File], + Class.forName("org.apache.spark.SecurityManager"), + classOf[Int], + classOf[String]) + httpServerConstructor.setAccessible(true) + // Create Http Server + val port = conf.getInt("spark.replClassServer.port", 0) + val server = httpServerConstructor + .newInstance(conf, outputDir, securityManager, new Integer(port), "HTTP server") + .asInstanceOf[Object] + + // Start Http Server + val startMethod = server.getClass.getMethod("start") + startMethod.setAccessible(true) + startMethod.invoke(server) + + // Get uri of this Http Server + val uriMethod = server.getClass.getMethod("uri") + uriMethod.setAccessible(true) + val uri = uriMethod.invoke(server).asInstanceOf[String] + Some((server, uri)) + } catch { + // Spark 2.0+ removed HttpServer, so return null instead. + case NonFatal(e) => + None + } + } + + protected def getUserJars(): Seq[String] = { + val sparkJars = conf.getOption("spark.jars").map(_.split(",")) + .map(_.filter(_.nonEmpty)).toSeq.flatten + val depJars = depFiles.asScala.filter(_.endsWith(".jar")) + val result = sparkJars ++ depJars + conf.set("spark.jars", result.mkString(",")) + result + } + + protected def getUserFiles(): Seq[String] = { + depFiles.asScala.filter(!_.endsWith(".jar")) + } +}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ca87f7d4/spark/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java deleted file mode 100644 index 6b1f0a9..0000000 --- a/spark/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java +++ /dev/null @@ -1,363 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.spark; - -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.PrintStream; -import java.io.PrintWriter; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.net.MalformedURLException; -import java.net.URL; -import java.net.URLClassLoader; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import com.google.common.reflect.TypeToken; -import com.google.gson.Gson; - -import org.apache.commons.lang.StringUtils; -import org.apache.spark.repl.SparkILoop; -import org.apache.zeppelin.interpreter.Interpreter; -import org.apache.zeppelin.interpreter.InterpreterContext; -import org.apache.zeppelin.interpreter.InterpreterGroup; -import org.apache.zeppelin.interpreter.InterpreterResult; -import org.apache.zeppelin.interpreter.InterpreterResult.Code; -import org.apache.zeppelin.interpreter.WrappedInterpreter; -import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; -import org.apache.zeppelin.scheduler.Scheduler; -import org.apache.zeppelin.spark.dep.SparkDependencyContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.sonatype.aether.resolution.ArtifactResolutionException; -import org.sonatype.aether.resolution.DependencyResolutionException; - -import scala.Console; -import scala.None; -import scala.Some; -import scala.collection.convert.WrapAsJava$; -import scala.collection.JavaConversions; -import scala.tools.nsc.Settings; -import scala.tools.nsc.interpreter.Completion.Candidates; -import scala.tools.nsc.interpreter.Completion.ScalaCompleter; -import scala.tools.nsc.interpreter.IMain; -import scala.tools.nsc.interpreter.Results; -import scala.tools.nsc.settings.MutableSettings.BooleanSetting; -import scala.tools.nsc.settings.MutableSettings.PathSetting; - - -/** - * DepInterpreter downloads dependencies and pass them when SparkInterpreter initialized. - * It extends SparkInterpreter but does not create sparkcontext - * - */ -public class DepInterpreter extends Interpreter { - /** - * intp - org.apache.spark.repl.SparkIMain (scala 2.10) - * intp - scala.tools.nsc.interpreter.IMain; (scala 2.11) - */ - private Object intp; - private ByteArrayOutputStream out; - private SparkDependencyContext depc; - /** - * completer - org.apache.spark.repl.SparkJLineCompletion (scala 2.10) - */ - private Object completer; - private SparkILoop interpreter; - static final Logger LOGGER = LoggerFactory.getLogger(DepInterpreter.class); - - public DepInterpreter(Properties property) { - super(property); - } - - public SparkDependencyContext getDependencyContext() { - return depc; - } - - public static String getSystemDefault( - String envName, - String propertyName, - String defaultValue) { - - if (envName != null && !envName.isEmpty()) { - String envValue = System.getenv().get(envName); - if (envValue != null) { - return envValue; - } - } - - if (propertyName != null && !propertyName.isEmpty()) { - String propValue = System.getProperty(propertyName); - if (propValue != null) { - return propValue; - } - } - return defaultValue; - } - - @Override - public void close() { - if (intp != null) { - Utils.invokeMethod(intp, "close"); - } - } - - @Override - public void open() { - out = new ByteArrayOutputStream(); - createIMain(); - } - - - private void createIMain() { - Settings settings = new Settings(); - URL[] urls = getClassloaderUrls(); - - // set classpath for scala compiler - PathSetting pathSettings = settings.classpath(); - String classpath = ""; - List<File> paths = currentClassPath(); - for (File f : paths) { - if (classpath.length() > 0) { - classpath += File.pathSeparator; - } - classpath += f.getAbsolutePath(); - } - - if (urls != null) { - for (URL u : urls) { - if (classpath.length() > 0) { - classpath += File.pathSeparator; - } - classpath += u.getFile(); - } - } - - pathSettings.v_$eq(classpath); - settings.scala$tools$nsc$settings$ScalaSettings$_setter_$classpath_$eq(pathSettings); - - // set classloader for scala compiler - settings.explicitParentLoader_$eq(new Some<>(Thread.currentThread() - .getContextClassLoader())); - - BooleanSetting b = (BooleanSetting) settings.usejavacp(); - b.v_$eq(true); - settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b); - - interpreter = new SparkILoop((java.io.BufferedReader) null, new PrintWriter(out)); - interpreter.settings_$eq(settings); - - interpreter.createInterpreter(); - - - intp = Utils.invokeMethod(interpreter, "intp"); - - if (Utils.isScala2_10()) { - Utils.invokeMethod(intp, "setContextClassLoader"); - Utils.invokeMethod(intp, "initializeSynchronous"); - } - - depc = new SparkDependencyContext(getProperty("zeppelin.dep.localrepo"), - getProperty("zeppelin.dep.additionalRemoteRepository")); - if (Utils.isScala2_10()) { - completer = Utils.instantiateClass( - "org.apache.spark.repl.SparkJLineCompletion", - new Class[]{Utils.findClass("org.apache.spark.repl.SparkIMain")}, - new Object[]{intp}); - } - interpret("@transient var _binder = new java.util.HashMap[String, Object]()"); - Map<String, Object> binder; - if (Utils.isScala2_10()) { - binder = (Map<String, Object>) getValue("_binder"); - } else { - binder = (Map<String, Object>) getLastObject(); - } - binder.put("depc", depc); - - interpret("@transient val z = " - + "_binder.get(\"depc\")" - + ".asInstanceOf[org.apache.zeppelin.spark.dep.SparkDependencyContext]"); - - } - - private Results.Result interpret(String line) { - return (Results.Result) Utils.invokeMethod( - intp, - "interpret", - new Class[] {String.class}, - new Object[] {line}); - } - - public Object getValue(String name) { - Object ret = Utils.invokeMethod( - intp, "valueOfTerm", new Class[]{String.class}, new Object[]{name}); - if (ret instanceof None) { - return null; - } else if (ret instanceof Some) { - return ((Some) ret).get(); - } else { - return ret; - } - } - - public Object getLastObject() { - IMain.Request r = (IMain.Request) Utils.invokeMethod(intp, "lastRequest"); - Object obj = r.lineRep().call("$result", - JavaConversions.asScalaBuffer(new LinkedList<>())); - return obj; - } - - @Override - public InterpreterResult interpret(String st, InterpreterContext context) { - PrintStream printStream = new PrintStream(out); - Console.setOut(printStream); - out.reset(); - - SparkInterpreter sparkInterpreter = getSparkInterpreter(); - - if (sparkInterpreter != null && sparkInterpreter.isSparkContextInitialized()) { - return new InterpreterResult(Code.ERROR, - "Must be used before SparkInterpreter (%spark) initialized\n" + - "Hint: put this paragraph before any Spark code and " + - "restart Zeppelin/Interpreter" ); - } - - scala.tools.nsc.interpreter.Results.Result ret = interpret(st); - Code code = getResultCode(ret); - - try { - depc.fetch(); - } catch (MalformedURLException | DependencyResolutionException - | ArtifactResolutionException e) { - LOGGER.error("Exception in DepInterpreter while interpret ", e); - return new InterpreterResult(Code.ERROR, e.toString()); - } - - if (code == Code.INCOMPLETE) { - return new InterpreterResult(code, "Incomplete expression"); - } else if (code == Code.ERROR) { - return new InterpreterResult(code, out.toString()); - } else { - return new InterpreterResult(code, out.toString()); - } - } - - private Code getResultCode(scala.tools.nsc.interpreter.Results.Result r) { - if (r instanceof scala.tools.nsc.interpreter.Results.Success$) { - return Code.SUCCESS; - } else if (r instanceof scala.tools.nsc.interpreter.Results.Incomplete$) { - return Code.INCOMPLETE; - } else { - return Code.ERROR; - } - } - - @Override - public void cancel(InterpreterContext context) { - } - - - @Override - public FormType getFormType() { - return FormType.NATIVE; - } - - @Override - public int getProgress(InterpreterContext context) { - return 0; - } - - @Override - public List<InterpreterCompletion> completion(String buf, int cursor, - InterpreterContext interpreterContext) { - if (Utils.isScala2_10()) { - ScalaCompleter c = (ScalaCompleter) Utils.invokeMethod(completer, "completer"); - Candidates ret = c.complete(buf, cursor); - - List<String> candidates = WrapAsJava$.MODULE$.seqAsJavaList(ret.candidates()); - List<InterpreterCompletion> completions = new LinkedList<>(); - - for (String candidate : candidates) { - completions.add(new InterpreterCompletion(candidate, candidate, StringUtils.EMPTY)); - } - - return completions; - } else { - return new LinkedList<>(); - } - } - - private List<File> currentClassPath() { - List<File> paths = classPath(Thread.currentThread().getContextClassLoader()); - String[] cps = System.getProperty("java.class.path").split(File.pathSeparator); - if (cps != null) { - for (String cp : cps) { - paths.add(new File(cp)); - } - } - return paths; - } - - private List<File> classPath(ClassLoader cl) { - List<File> paths = new LinkedList<>(); - if (cl == null) { - return paths; - } - - if (cl instanceof URLClassLoader) { - URLClassLoader ucl = (URLClassLoader) cl; - URL[] urls = ucl.getURLs(); - if (urls != null) { - for (URL url : urls) { - paths.add(new File(url.getFile())); - } - } - } - return paths; - } - - private SparkInterpreter getSparkInterpreter() { - InterpreterGroup intpGroup = getInterpreterGroup(); - if (intpGroup == null) { - return null; - } - - Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName()); - if (p == null) { - return null; - } - - while (p instanceof WrappedInterpreter) { - p = ((WrappedInterpreter) p).getInnerInterpreter(); - } - return (SparkInterpreter) p; - } - - @Override - public Scheduler getScheduler() { - SparkInterpreter sparkInterpreter = getSparkInterpreter(); - if (sparkInterpreter != null) { - return getSparkInterpreter().getScheduler(); - } else { - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ca87f7d4/spark/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java deleted file mode 100644 index a050569..0000000 --- a/spark/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.spark; - -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.zeppelin.interpreter.Interpreter; -import org.apache.zeppelin.interpreter.InterpreterContext; -import org.apache.zeppelin.interpreter.InterpreterException; -import org.apache.zeppelin.interpreter.LazyOpenInterpreter; -import org.apache.zeppelin.interpreter.WrappedInterpreter; -import org.apache.zeppelin.python.IPythonInterpreter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Map; -import java.util.Properties; - -/** - * PySparkInterpreter which use IPython underlying. - */ -public class IPySparkInterpreter extends IPythonInterpreter { - - private static final Logger LOGGER = LoggerFactory.getLogger(IPySparkInterpreter.class); - - private SparkInterpreter sparkInterpreter; - - public IPySparkInterpreter(Properties property) { - super(property); - } - - @Override - public void open() throws InterpreterException { - setProperty("zeppelin.python", - PySparkInterpreter.getPythonExec(getProperties())); - sparkInterpreter = getSparkInterpreter(); - SparkConf conf = sparkInterpreter.getSparkContext().getConf(); - // only set PYTHONPATH in local or yarn-client mode. - // yarn-cluster will setup PYTHONPATH automatically. - if (!conf.get("spark.submit.deployMode").equals("cluster")) { - setAdditionalPythonPath(PythonUtils.sparkPythonPath()); - setAddBulitinPy4j(false); - } - setAdditionalPythonInitFile("python/zeppelin_ipyspark.py"); - super.open(); - } - - @Override - protected Map<String, String> setupIPythonEnv() throws IOException { - Map<String, String> env = super.setupIPythonEnv(); - // set PYSPARK_PYTHON - SparkConf conf = sparkInterpreter.getSparkContext().getConf(); - if (conf.contains("spark.pyspark.python")) { - env.put("PYSPARK_PYTHON", conf.get("spark.pyspark.python")); - } - return env; - } - - private SparkInterpreter getSparkInterpreter() throws InterpreterException { - LazyOpenInterpreter lazy = null; - SparkInterpreter spark = null; - Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName()); - - while (p instanceof WrappedInterpreter) { - if (p instanceof LazyOpenInterpreter) { - lazy = (LazyOpenInterpreter) p; - } - p = ((WrappedInterpreter) p).getInnerInterpreter(); - } - spark = (SparkInterpreter) p; - - if (lazy != null) { - lazy.open(); - } - return spark; - } - - @Override - public void cancel(InterpreterContext context) { - super.cancel(context); - sparkInterpreter.cancel(context); - } - - @Override - public void close() { - super.close(); - if (sparkInterpreter != null) { - sparkInterpreter.close(); - } - } - - @Override - public int getProgress(InterpreterContext context) { - return sparkInterpreter.getProgress(context); - } - - public boolean isSpark2() { - return sparkInterpreter.getSparkVersion().newerThanEquals(SparkVersion.SPARK_2_0_0); - } - - public JavaSparkContext getJavaSparkContext() { - return sparkInterpreter.getJavaSparkContext(); - } - - public Object getSQLContext() { - return sparkInterpreter.getSQLContext(); - } - - public Object getSparkSession() { - return sparkInterpreter.getSparkSession(); - } -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ca87f7d4/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java deleted file mode 100644 index 47ffe14..0000000 --- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ /dev/null @@ -1,745 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.spark; - -import java.io.BufferedWriter; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.io.PipedInputStream; -import java.io.PipedOutputStream; -import java.net.MalformedURLException; -import java.net.ServerSocket; -import java.net.URL; -import java.net.URLClassLoader; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import org.apache.commons.compress.utils.IOUtils; -import org.apache.commons.exec.CommandLine; -import org.apache.commons.exec.DefaultExecutor; -import org.apache.commons.exec.ExecuteException; -import org.apache.commons.exec.ExecuteResultHandler; -import org.apache.commons.exec.ExecuteWatchdog; -import org.apache.commons.exec.PumpStreamHandler; -import org.apache.commons.exec.environment.EnvironmentUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.SQLContext; -import org.apache.zeppelin.interpreter.*; -import org.apache.zeppelin.interpreter.InterpreterHookRegistry.HookType; -import org.apache.zeppelin.interpreter.InterpreterResult.Code; -import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; -import org.apache.zeppelin.interpreter.util.InterpreterOutputStream; -import org.apache.zeppelin.spark.dep.SparkDependencyContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.gson.Gson; - -import py4j.GatewayServer; - -/** - * - */ -public class PySparkInterpreter extends Interpreter implements ExecuteResultHandler { - private static final Logger LOGGER = LoggerFactory.getLogger(PySparkInterpreter.class); - private GatewayServer gatewayServer; - private DefaultExecutor executor; - private int port; - private InterpreterOutputStream outputStream; - private BufferedWriter ins; - private PipedInputStream in; - private ByteArrayOutputStream input; - private String scriptPath; - boolean pythonscriptRunning = false; - private static final int MAX_TIMEOUT_SEC = 10; - private long pythonPid; - - private IPySparkInterpreter iPySparkInterpreter; - - public PySparkInterpreter(Properties property) { - super(property); - - pythonPid = -1; - try { - File scriptFile = File.createTempFile("zeppelin_pyspark-", ".py"); - scriptPath = scriptFile.getAbsolutePath(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - private void createPythonScript() throws InterpreterException { - ClassLoader classLoader = getClass().getClassLoader(); - File out = new File(scriptPath); - - if (out.exists() && out.isDirectory()) { - throw new InterpreterException("Can't create python script " + out.getAbsolutePath()); - } - - try { - FileOutputStream outStream = new FileOutputStream(out); - IOUtils.copy( - classLoader.getResourceAsStream("python/zeppelin_pyspark.py"), - outStream); - outStream.close(); - } catch (IOException e) { - throw new InterpreterException(e); - } - - LOGGER.info("File {} created", scriptPath); - } - - @Override - public void open() throws InterpreterException { - // try IPySparkInterpreter first - iPySparkInterpreter = getIPySparkInterpreter(); - if (getProperty("zeppelin.pyspark.useIPython", "true").equals("true") && - StringUtils.isEmpty( - iPySparkInterpreter.checkIPythonPrerequisite(getPythonExec(getProperties())))) { - try { - iPySparkInterpreter.open(); - if (InterpreterContext.get() != null) { - // don't print it when it is in testing, just for easy output check in test. - InterpreterContext.get().out.write(("IPython is available, " + - "use IPython for PySparkInterpreter\n") - .getBytes()); - } - LOGGER.info("Use IPySparkInterpreter to replace PySparkInterpreter"); - return; - } catch (Exception e) { - LOGGER.warn("Fail to open IPySparkInterpreter", e); - } - } - iPySparkInterpreter = null; - if (getProperty("zeppelin.pyspark.useIPython", "true").equals("true")) { - // don't print it when it is in testing, just for easy output check in test. - try { - InterpreterContext.get().out.write(("IPython is not available, " + - "use the native PySparkInterpreter\n") - .getBytes()); - } catch (IOException e) { - LOGGER.warn("Fail to write InterpreterOutput", e); - } - } - - // Add matplotlib display hook - InterpreterGroup intpGroup = getInterpreterGroup(); - if (intpGroup != null && intpGroup.getInterpreterHookRegistry() != null) { - registerHook(HookType.POST_EXEC_DEV, "__zeppelin__._displayhook()"); - } - DepInterpreter depInterpreter = getDepInterpreter(); - - // load libraries from Dependency Interpreter - URL [] urls = new URL[0]; - List<URL> urlList = new LinkedList<>(); - - if (depInterpreter != null) { - SparkDependencyContext depc = depInterpreter.getDependencyContext(); - if (depc != null) { - List<File> files = depc.getFiles(); - if (files != null) { - for (File f : files) { - try { - urlList.add(f.toURI().toURL()); - } catch (MalformedURLException e) { - LOGGER.error("Error", e); - } - } - } - } - } - - String localRepo = getProperty("zeppelin.interpreter.localRepo"); - if (localRepo != null) { - File localRepoDir = new File(localRepo); - if (localRepoDir.exists()) { - File[] files = localRepoDir.listFiles(); - if (files != null) { - for (File f : files) { - try { - urlList.add(f.toURI().toURL()); - } catch (MalformedURLException e) { - LOGGER.error("Error", e); - } - } - } - } - } - - urls = urlList.toArray(urls); - ClassLoader oldCl = Thread.currentThread().getContextClassLoader(); - try { - URLClassLoader newCl = new URLClassLoader(urls, oldCl); - Thread.currentThread().setContextClassLoader(newCl); - createGatewayServerAndStartScript(); - } catch (Exception e) { - LOGGER.error("Error", e); - throw new InterpreterException(e); - } finally { - Thread.currentThread().setContextClassLoader(oldCl); - } - } - - private Map setupPySparkEnv() throws IOException, InterpreterException { - Map env = EnvironmentUtils.getProcEnvironment(); - - // only set PYTHONPATH in local or yarn-client mode. - // yarn-cluster will setup PYTHONPATH automatically. - SparkConf conf = getSparkConf(); - if (!conf.get("spark.submit.deployMode", "client").equals("cluster")) { - if (!env.containsKey("PYTHONPATH")) { - env.put("PYTHONPATH", PythonUtils.sparkPythonPath()); - } else { - env.put("PYTHONPATH", PythonUtils.sparkPythonPath()); - } - } - - // get additional class paths when using SPARK_SUBMIT and not using YARN-CLIENT - // also, add all packages to PYTHONPATH since there might be transitive dependencies - if (SparkInterpreter.useSparkSubmit() && - !getSparkInterpreter().isYarnMode()) { - - String sparkSubmitJars = getSparkConf().get("spark.jars").replace(",", ":"); - - if (!"".equals(sparkSubmitJars)) { - env.put("PYTHONPATH", env.get("PYTHONPATH") + sparkSubmitJars); - } - } - - LOGGER.info("PYTHONPATH: " + env.get("PYTHONPATH")); - - // set PYSPARK_PYTHON - if (getSparkConf().contains("spark.pyspark.python")) { - env.put("PYSPARK_PYTHON", getSparkConf().get("spark.pyspark.python")); - } - return env; - } - - // Run python shell - // Choose python in the order of - // PYSPARK_DRIVER_PYTHON > PYSPARK_PYTHON > zeppelin.pyspark.python - public static String getPythonExec(Properties properties) { - String pythonExec = properties.getProperty("zeppelin.pyspark.python", "python"); - if (System.getenv("PYSPARK_PYTHON") != null) { - pythonExec = System.getenv("PYSPARK_PYTHON"); - } - if (System.getenv("PYSPARK_DRIVER_PYTHON") != null) { - pythonExec = System.getenv("PYSPARK_DRIVER_PYTHON"); - } - return pythonExec; - } - - private void createGatewayServerAndStartScript() throws InterpreterException { - // create python script - createPythonScript(); - - port = findRandomOpenPortOnAllLocalInterfaces(); - - gatewayServer = new GatewayServer(this, port); - gatewayServer.start(); - - String pythonExec = getPythonExec(getProperties()); - LOGGER.info("pythonExec: " + pythonExec); - CommandLine cmd = CommandLine.parse(pythonExec); - cmd.addArgument(scriptPath, false); - cmd.addArgument(Integer.toString(port), false); - cmd.addArgument(Integer.toString(getSparkInterpreter().getSparkVersion().toNumber()), false); - executor = new DefaultExecutor(); - outputStream = new InterpreterOutputStream(LOGGER); - PipedOutputStream ps = new PipedOutputStream(); - in = null; - try { - in = new PipedInputStream(ps); - } catch (IOException e1) { - throw new InterpreterException(e1); - } - ins = new BufferedWriter(new OutputStreamWriter(ps)); - - input = new ByteArrayOutputStream(); - - PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream, outputStream, in); - executor.setStreamHandler(streamHandler); - executor.setWatchdog(new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT)); - - try { - Map env = setupPySparkEnv(); - executor.execute(cmd, env, this); - pythonscriptRunning = true; - } catch (IOException e) { - throw new InterpreterException(e); - } - - - try { - input.write("import sys, getopt\n".getBytes()); - ins.flush(); - } catch (IOException e) { - throw new InterpreterException(e); - } - } - - private int findRandomOpenPortOnAllLocalInterfaces() throws InterpreterException { - int port; - try (ServerSocket socket = new ServerSocket(0);) { - port = socket.getLocalPort(); - socket.close(); - } catch (IOException e) { - throw new InterpreterException(e); - } - return port; - } - - @Override - public void close() { - if (iPySparkInterpreter != null) { - iPySparkInterpreter.close(); - return; - } - executor.getWatchdog().destroyProcess(); - new File(scriptPath).delete(); - gatewayServer.shutdown(); - } - - PythonInterpretRequest pythonInterpretRequest = null; - - /** - * - */ - public class PythonInterpretRequest { - public String statements; - public String jobGroup; - public String jobDescription; - - public PythonInterpretRequest(String statements, String jobGroup, - String jobDescription) { - this.statements = statements; - this.jobGroup = jobGroup; - this.jobDescription = jobDescription; - } - - public String statements() { - return statements; - } - - public String jobGroup() { - return jobGroup; - } - - public String jobDescription() { - return jobDescription; - } - } - - Integer statementSetNotifier = new Integer(0); - - public PythonInterpretRequest getStatements() { - synchronized (statementSetNotifier) { - while (pythonInterpretRequest == null) { - try { - statementSetNotifier.wait(1000); - } catch (InterruptedException e) { - } - } - PythonInterpretRequest req = pythonInterpretRequest; - pythonInterpretRequest = null; - return req; - } - } - - String statementOutput = null; - boolean statementError = false; - Integer statementFinishedNotifier = new Integer(0); - - public void setStatementsFinished(String out, boolean error) { - synchronized (statementFinishedNotifier) { - LOGGER.debug("Setting python statement output: " + out + ", error: " + error); - statementOutput = out; - statementError = error; - statementFinishedNotifier.notify(); - } - } - - boolean pythonScriptInitialized = false; - Integer pythonScriptInitializeNotifier = new Integer(0); - - public void onPythonScriptInitialized(long pid) { - pythonPid = pid; - synchronized (pythonScriptInitializeNotifier) { - LOGGER.debug("onPythonScriptInitialized is called"); - pythonScriptInitialized = true; - pythonScriptInitializeNotifier.notifyAll(); - } - } - - public void appendOutput(String message) throws IOException { - LOGGER.debug("Output from python process: " + message); - outputStream.getInterpreterOutput().write(message); - } - - @Override - public InterpreterResult interpret(String st, InterpreterContext context) - throws InterpreterException { - SparkInterpreter sparkInterpreter = getSparkInterpreter(); - sparkInterpreter.populateSparkWebUrl(context); - if (sparkInterpreter.isUnsupportedSparkVersion()) { - return new InterpreterResult(Code.ERROR, "Spark " - + sparkInterpreter.getSparkVersion().toString() + " is not supported"); - } - - if (iPySparkInterpreter != null) { - return iPySparkInterpreter.interpret(st, context); - } - - if (!pythonscriptRunning) { - return new InterpreterResult(Code.ERROR, "python process not running" - + outputStream.toString()); - } - - outputStream.setInterpreterOutput(context.out); - - synchronized (pythonScriptInitializeNotifier) { - long startTime = System.currentTimeMillis(); - while (pythonScriptInitialized == false - && pythonscriptRunning - && System.currentTimeMillis() - startTime < MAX_TIMEOUT_SEC * 1000) { - try { - pythonScriptInitializeNotifier.wait(1000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } - - List<InterpreterResultMessage> errorMessage; - try { - context.out.flush(); - errorMessage = context.out.toInterpreterResultMessage(); - } catch (IOException e) { - throw new InterpreterException(e); - } - - - if (pythonscriptRunning == false) { - // python script failed to initialize and terminated - errorMessage.add(new InterpreterResultMessage( - InterpreterResult.Type.TEXT, "failed to start pyspark")); - return new InterpreterResult(Code.ERROR, errorMessage); - } - if (pythonScriptInitialized == false) { - // timeout. didn't get initialized message - errorMessage.add(new InterpreterResultMessage( - InterpreterResult.Type.TEXT, "pyspark is not responding")); - return new InterpreterResult(Code.ERROR, errorMessage); - } - - if (!sparkInterpreter.getSparkVersion().isPysparkSupported()) { - errorMessage.add(new InterpreterResultMessage( - InterpreterResult.Type.TEXT, - "pyspark " + sparkInterpreter.getSparkContext().version() + " is not supported")); - return new InterpreterResult(Code.ERROR, errorMessage); - } - String jobGroup = Utils.buildJobGroupId(context); - String jobDesc = "Started by: " + Utils.getUserName(context.getAuthenticationInfo()); - SparkZeppelinContext __zeppelin__ = sparkInterpreter.getZeppelinContext(); - __zeppelin__.setInterpreterContext(context); - __zeppelin__.setGui(context.getGui()); - __zeppelin__.setNoteGui(context.getNoteGui()); - pythonInterpretRequest = new PythonInterpretRequest(st, jobGroup, jobDesc); - statementOutput = null; - - synchronized (statementSetNotifier) { - statementSetNotifier.notify(); - } - - synchronized (statementFinishedNotifier) { - while (statementOutput == null) { - try { - statementFinishedNotifier.wait(1000); - } catch (InterruptedException e) { - } - } - } - - if (statementError) { - return new InterpreterResult(Code.ERROR, statementOutput); - } else { - - try { - context.out.flush(); - } catch (IOException e) { - throw new InterpreterException(e); - } - - return new InterpreterResult(Code.SUCCESS); - } - } - - public void interrupt() throws IOException { - if (pythonPid > -1) { - LOGGER.info("Sending SIGINT signal to PID : " + pythonPid); - Runtime.getRuntime().exec("kill -SIGINT " + pythonPid); - } else { - LOGGER.warn("Non UNIX/Linux system, close the interpreter"); - close(); - } - } - - @Override - public void cancel(InterpreterContext context) throws InterpreterException { - if (iPySparkInterpreter != null) { - iPySparkInterpreter.cancel(context); - return; - } - SparkInterpreter sparkInterpreter = getSparkInterpreter(); - sparkInterpreter.cancel(context); - try { - interrupt(); - } catch (IOException e) { - LOGGER.error("Error", e); - } - } - - @Override - public FormType getFormType() { - return FormType.NATIVE; - } - - @Override - public int getProgress(InterpreterContext context) throws InterpreterException { - if (iPySparkInterpreter != null) { - return iPySparkInterpreter.getProgress(context); - } - SparkInterpreter sparkInterpreter = getSparkInterpreter(); - return sparkInterpreter.getProgress(context); - } - - - @Override - public List<InterpreterCompletion> completion(String buf, int cursor, - InterpreterContext interpreterContext) throws InterpreterException { - if (iPySparkInterpreter != null) { - return iPySparkInterpreter.completion(buf, cursor, interpreterContext); - } - if (buf.length() < cursor) { - cursor = buf.length(); - } - String completionString = getCompletionTargetString(buf, cursor); - String completionCommand = "completion.getCompletion('" + completionString + "')"; - - //start code for completion - SparkInterpreter sparkInterpreter = getSparkInterpreter(); - if (sparkInterpreter.isUnsupportedSparkVersion() || pythonscriptRunning == false) { - return new LinkedList<>(); - } - - pythonInterpretRequest = new PythonInterpretRequest(completionCommand, "", ""); - statementOutput = null; - - synchronized (statementSetNotifier) { - statementSetNotifier.notify(); - } - - String[] completionList = null; - synchronized (statementFinishedNotifier) { - long startTime = System.currentTimeMillis(); - while (statementOutput == null - && pythonscriptRunning) { - try { - if (System.currentTimeMillis() - startTime > MAX_TIMEOUT_SEC * 1000) { - LOGGER.error("pyspark completion didn't have response for {}sec.", MAX_TIMEOUT_SEC); - break; - } - statementFinishedNotifier.wait(1000); - } catch (InterruptedException e) { - // not working - LOGGER.info("wait drop"); - return new LinkedList<>(); - } - } - if (statementError) { - return new LinkedList<>(); - } - Gson gson = new Gson(); - completionList = gson.fromJson(statementOutput, String[].class); - } - //end code for completion - - if (completionList == null) { - return new LinkedList<>(); - } - - List<InterpreterCompletion> results = new LinkedList<>(); - for (String name: completionList) { - results.add(new InterpreterCompletion(name, name, StringUtils.EMPTY)); - } - return results; - } - - private String getCompletionTargetString(String text, int cursor) { - String[] completionSeqCharaters = {" ", "\n", "\t"}; - int completionEndPosition = cursor; - int completionStartPosition = cursor; - int indexOfReverseSeqPostion = cursor; - - String resultCompletionText = ""; - String completionScriptText = ""; - try { - completionScriptText = text.substring(0, cursor); - } - catch (Exception e) { - LOGGER.error(e.toString()); - return null; - } - completionEndPosition = completionScriptText.length(); - - String tempReverseCompletionText = new StringBuilder(completionScriptText).reverse().toString(); - - for (String seqCharacter : completionSeqCharaters) { - indexOfReverseSeqPostion = tempReverseCompletionText.indexOf(seqCharacter); - - if (indexOfReverseSeqPostion < completionStartPosition && indexOfReverseSeqPostion > 0) { - completionStartPosition = indexOfReverseSeqPostion; - } - - } - - if (completionStartPosition == completionEndPosition) { - completionStartPosition = 0; - } - else - { - completionStartPosition = completionEndPosition - completionStartPosition; - } - resultCompletionText = completionScriptText.substring( - completionStartPosition , completionEndPosition); - - return resultCompletionText; - } - - - private SparkInterpreter getSparkInterpreter() throws InterpreterException { - LazyOpenInterpreter lazy = null; - SparkInterpreter spark = null; - Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName()); - - while (p instanceof WrappedInterpreter) { - if (p instanceof LazyOpenInterpreter) { - lazy = (LazyOpenInterpreter) p; - } - p = ((WrappedInterpreter) p).getInnerInterpreter(); - } - spark = (SparkInterpreter) p; - - if (lazy != null) { - lazy.open(); - } - return spark; - } - - private IPySparkInterpreter getIPySparkInterpreter() { - LazyOpenInterpreter lazy = null; - IPySparkInterpreter iPySpark = null; - Interpreter p = getInterpreterInTheSameSessionByClassName(IPySparkInterpreter.class.getName()); - - while (p instanceof WrappedInterpreter) { - if (p instanceof LazyOpenInterpreter) { - lazy = (LazyOpenInterpreter) p; - } - p = ((WrappedInterpreter) p).getInnerInterpreter(); - } - iPySpark = (IPySparkInterpreter) p; - return iPySpark; - } - - public SparkZeppelinContext getZeppelinContext() throws InterpreterException { - SparkInterpreter sparkIntp = getSparkInterpreter(); - if (sparkIntp != null) { - return getSparkInterpreter().getZeppelinContext(); - } else { - return null; - } - } - - public JavaSparkContext getJavaSparkContext() throws InterpreterException { - SparkInterpreter intp = getSparkInterpreter(); - if (intp == null) { - return null; - } else { - return new JavaSparkContext(intp.getSparkContext()); - } - } - - public Object getSparkSession() throws InterpreterException { - SparkInterpreter intp = getSparkInterpreter(); - if (intp == null) { - return null; - } else { - return intp.getSparkSession(); - } - } - - public SparkConf getSparkConf() throws InterpreterException { - JavaSparkContext sc = getJavaSparkContext(); - if (sc == null) { - return null; - } else { - return getJavaSparkContext().getConf(); - } - } - - public SQLContext getSQLContext() throws InterpreterException { - SparkInterpreter intp = getSparkInterpreter(); - if (intp == null) { - return null; - } else { - return intp.getSQLContext(); - } - } - - private DepInterpreter getDepInterpreter() { - Interpreter p = getInterpreterInTheSameSessionByClassName(DepInterpreter.class.getName()); - if (p == null) { - return null; - } - - while (p instanceof WrappedInterpreter) { - p = ((WrappedInterpreter) p).getInnerInterpreter(); - } - return (DepInterpreter) p; - } - - - @Override - public void onProcessComplete(int exitValue) { - pythonscriptRunning = false; - LOGGER.info("python process terminated. exit code " + exitValue); - } - - @Override - public void onProcessFailed(ExecuteException e) { - pythonscriptRunning = false; - LOGGER.error("python process failed", e); - } -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ca87f7d4/spark/src/main/java/org/apache/zeppelin/spark/PythonUtils.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PythonUtils.java b/spark/src/main/java/org/apache/zeppelin/spark/PythonUtils.java deleted file mode 100644 index 8182690..0000000 --- a/spark/src/main/java/org/apache/zeppelin/spark/PythonUtils.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.zeppelin.spark; - -import org.apache.commons.lang3.StringUtils; - -import java.io.File; -import java.io.FilenameFilter; -import java.util.ArrayList; -import java.util.List; - -/** - * Util class for PySpark - */ -public class PythonUtils { - - /** - * Get the PYTHONPATH for PySpark, either from SPARK_HOME, if it is set, or from ZEPPELIN_HOME - * when it is embedded mode. - * - * This method will called in zeppelin server process and spark driver process when it is - * local or yarn-client mode. - */ - public static String sparkPythonPath() { - List<String> pythonPath = new ArrayList<String>(); - String sparkHome = System.getenv("SPARK_HOME"); - String zeppelinHome = System.getenv("ZEPPELIN_HOME"); - if (zeppelinHome == null) { - zeppelinHome = new File("..").getAbsolutePath(); - } - if (sparkHome != null) { - // non-embedded mode when SPARK_HOME is specified. - File pyspark = new File(sparkHome, "python/lib/pyspark.zip"); - if (!pyspark.exists()) { - throw new RuntimeException("No pyspark.zip found under " + sparkHome + "/python/lib"); - } - pythonPath.add(pyspark.getAbsolutePath()); - File[] py4j = new File(sparkHome + "/python/lib").listFiles(new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { - return name.startsWith("py4j"); - } - }); - if (py4j.length == 0) { - throw new RuntimeException("No py4j files found under " + sparkHome + "/python/lib"); - } else if (py4j.length > 1) { - throw new RuntimeException("Multiple py4j files found under " + sparkHome + "/python/lib"); - } else { - pythonPath.add(py4j[0].getAbsolutePath()); - } - } else { - // embedded mode - File pyspark = new File(zeppelinHome, "interpreter/spark/pyspark/pyspark.zip"); - if (!pyspark.exists()) { - throw new RuntimeException("No pyspark.zip found: " + pyspark.getAbsolutePath()); - } - pythonPath.add(pyspark.getAbsolutePath()); - File[] py4j = new File(zeppelinHome, "interpreter/spark/pyspark").listFiles( - new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { - return name.startsWith("py4j"); - } - }); - if (py4j.length == 0) { - throw new RuntimeException("No py4j files found under " + zeppelinHome + - "/interpreter/spark/pyspark"); - } else if (py4j.length > 1) { - throw new RuntimeException("Multiple py4j files found under " + sparkHome + - "/interpreter/spark/pyspark"); - } else { - pythonPath.add(py4j[0].getAbsolutePath()); - } - } - - // add ${ZEPPELIN_HOME}/interpreter/lib/python for all the cases - pythonPath.add(zeppelinHome + "/interpreter/lib/python"); - return StringUtils.join(pythonPath, ":"); - } -}