Repository: zeppelin Updated Branches: refs/heads/master ac2e957e2 -> 2eac6872e
ZEPPELIN-3678. Change SparkZeppelinContext to scala implementation ### What is this PR for? This PR just to change SparkZeppelinContext to scala implementation, so that it is easy for us to implement SparkZeppelinContext via scala instead of java. This also make SparkZeppelinContext more readable and maintainable. ### What type of PR is it? [Refactoring] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-3678 ### How should this be tested? * CI pass ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3108 from zjffdu/ZEPPELIN-3678 and squashes the following commits: bc3feafc1 [Jeff Zhang] ZEPPELIN-3678. Change SparkZeppelinContext to scala implementation Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/2eac6872 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/2eac6872 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/2eac6872 Branch: refs/heads/master Commit: 2eac6872e22c6f63c0437d78530772357864d4bb Parents: ac2e957 Author: Jeff Zhang <zjf...@apache.org> Authored: Thu Aug 2 10:03:58 2018 +0800 Committer: Jeff Zhang <zjf...@apache.org> Committed: Fri Aug 3 10:25:38 2018 +0800 ---------------------------------------------------------------------- .../zeppelin/spark/SparkZeppelinContext.java | 233 ------------------- .../zeppelin/spark/SparkZeppelinContext.scala | 150 ++++++++++++ .../interpreter/BaseZeppelinContext.java | 15 +- .../zeppelin/rest/ZeppelinSparkClusterTest.java | 2 +- 4 files changed, 158 insertions(+), 242 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2eac6872/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java deleted file mode 100644 index 87d5b16..0000000 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java +++ /dev/null @@ -1,233 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.spark; - -import com.google.common.collect.Lists; -import org.apache.spark.SparkContext; -import org.apache.zeppelin.annotation.ZeppelinApi; -import org.apache.zeppelin.display.AngularObjectWatcher; -import org.apache.zeppelin.display.ui.OptionInput; -import org.apache.zeppelin.interpreter.BaseZeppelinContext; -import org.apache.zeppelin.interpreter.InterpreterContext; -import org.apache.zeppelin.interpreter.InterpreterHookRegistry; -import scala.Tuple2; -import scala.Unit; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -import static scala.collection.JavaConversions.asJavaIterable; -import static scala.collection.JavaConversions.collectionAsScalaIterable; - - -/** - * ZeppelinContext for Spark - */ -public class SparkZeppelinContext extends BaseZeppelinContext { - - private SparkContext sc; - private List<Class> supportedClasses; - private Map<String, String> interpreterClassMap; - private SparkShims sparkShims; - - public SparkZeppelinContext( - SparkContext sc, - SparkShims sparkShims, - InterpreterHookRegistry hooks, - int maxResult) { - super(hooks, maxResult); - this.sc = sc; - this.sparkShims = sparkShims; - interpreterClassMap = new HashMap(); - interpreterClassMap.put("spark", "org.apache.zeppelin.spark.SparkInterpreter"); - interpreterClassMap.put("sql", "org.apache.zeppelin.spark.SparkSqlInterpreter"); - interpreterClassMap.put("dep", "org.apache.zeppelin.spark.DepInterpreter"); - interpreterClassMap.put("pyspark", "org.apache.zeppelin.spark.PySparkInterpreter"); - - this.supportedClasses = new ArrayList<>(); - try { - supportedClasses.add(this.getClass().forName("org.apache.spark.sql.Dataset")); - } catch (ClassNotFoundException e) { - } - - try { - supportedClasses.add(this.getClass().forName("org.apache.spark.sql.DataFrame")); - } catch (ClassNotFoundException e) { - } - - if (supportedClasses.isEmpty()) { - throw new RuntimeException("Can not load Dataset/DataFrame class"); - } - } - - @Override - public List<Class> getSupportedClasses() { - return supportedClasses; - } - - @Override - public Map<String, String> getInterpreterClassMap() { - return interpreterClassMap; - } - - @Override - public String showData(Object obj) { - return sparkShims.showDataFrame(obj, maxResult); - } - - @ZeppelinApi - public Object select(String name, scala.collection.Iterable<Tuple2<Object, String>> options) { - return select(name, null, options); - } - - @ZeppelinApi - public Object select(String name, Object defaultValue, - scala.collection.Iterable<Tuple2<Object, String>> options) { - return select(name, defaultValue, tuplesToParamOptions(options)); - } - - @ZeppelinApi - public scala.collection.Seq<Object> checkbox( - String name, - scala.collection.Iterable<Tuple2<Object, String>> options) { - List<Object> allChecked = new LinkedList<>(); - for (Tuple2<Object, String> option : asJavaIterable(options)) { - allChecked.add(option._1()); - } - return checkbox(name, collectionAsScalaIterable(allChecked), options); - } - - @ZeppelinApi - public scala.collection.Seq<Object> checkbox( - String name, - scala.collection.Iterable<Object> defaultChecked, - scala.collection.Iterable<Tuple2<Object, String>> options) { - List<Object> defaultCheckedList = Lists.newArrayList(asJavaIterable(defaultChecked).iterator()); - Collection<Object> checkbox = checkbox(name, defaultCheckedList, tuplesToParamOptions(options)); - List<Object> checkboxList = Arrays.asList(checkbox.toArray()); - return scala.collection.JavaConversions.asScalaBuffer(checkboxList).toSeq(); - } - - @ZeppelinApi - public Object noteSelect(String name, scala.collection.Iterable<Tuple2<Object, String>> options) { - return noteSelect(name, "", options); - } - - @ZeppelinApi - public Object noteSelect(String name, Object defaultValue, - scala.collection.Iterable<Tuple2<Object, String>> options) { - return noteSelect(name, defaultValue, tuplesToParamOptions(options)); - } - - @ZeppelinApi - public scala.collection.Seq<Object> noteCheckbox( - String name, - scala.collection.Iterable<Tuple2<Object, String>> options) { - List<Object> allChecked = new LinkedList<>(); - for (Tuple2<Object, String> option : asJavaIterable(options)) { - allChecked.add(option._1()); - } - return noteCheckbox(name, collectionAsScalaIterable(allChecked), options); - } - - @ZeppelinApi - public scala.collection.Seq<Object> noteCheckbox( - String name, - scala.collection.Iterable<Object> defaultChecked, - scala.collection.Iterable<Tuple2<Object, String>> options) { - List<Object> defaultCheckedList = Lists.newArrayList(asJavaIterable(defaultChecked).iterator()); - Collection<Object> checkbox = noteCheckbox(name, defaultCheckedList, - tuplesToParamOptions(options)); - List<Object> checkboxList = Arrays.asList(checkbox.toArray()); - return scala.collection.JavaConversions.asScalaBuffer(checkboxList).toSeq(); - } - - private OptionInput.ParamOption[] tuplesToParamOptions( - scala.collection.Iterable<Tuple2<Object, String>> options) { - int n = options.size(); - OptionInput.ParamOption[] paramOptions = new OptionInput.ParamOption[n]; - Iterator<Tuple2<Object, String>> it = asJavaIterable(options).iterator(); - - int i = 0; - while (it.hasNext()) { - Tuple2<Object, String> valueAndDisplayValue = it.next(); - paramOptions[i++] = new OptionInput.ParamOption(valueAndDisplayValue._1(), - valueAndDisplayValue._2()); - } - - return paramOptions; - } - - @ZeppelinApi - public void angularWatch(String name, - final scala.Function2<Object, Object, Unit> func) { - angularWatch(name, interpreterContext.getNoteId(), func); - } - - @Deprecated - public void angularWatchGlobal(String name, - final scala.Function2<Object, Object, Unit> func) { - angularWatch(name, null, func); - } - - @ZeppelinApi - public void angularWatch( - String name, - final scala.Function3<Object, Object, InterpreterContext, Unit> func) { - angularWatch(name, interpreterContext.getNoteId(), func); - } - - @Deprecated - public void angularWatchGlobal( - String name, - final scala.Function3<Object, Object, InterpreterContext, Unit> func) { - angularWatch(name, null, func); - } - - private void angularWatch(String name, String noteId, - final scala.Function2<Object, Object, Unit> func) { - AngularObjectWatcher w = new AngularObjectWatcher(getInterpreterContext()) { - @Override - public void watch(Object oldObject, Object newObject, - InterpreterContext context) { - func.apply(newObject, newObject); - } - }; - angularWatch(name, noteId, w); - } - - private void angularWatch( - String name, - String noteId, - final scala.Function3<Object, Object, InterpreterContext, Unit> func) { - AngularObjectWatcher w = new AngularObjectWatcher(getInterpreterContext()) { - @Override - public void watch(Object oldObject, Object newObject, - InterpreterContext context) { - func.apply(oldObject, newObject, context); - } - }; - angularWatch(name, noteId, w); - } -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2eac6872/spark/interpreter/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala b/spark/interpreter/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala new file mode 100644 index 0000000..0e10d84 --- /dev/null +++ b/spark/interpreter/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.spark + +import java.util + +import org.apache.spark.SparkContext +import org.apache.zeppelin.annotation.ZeppelinApi +import org.apache.zeppelin.display.AngularObjectWatcher +import org.apache.zeppelin.display.ui.OptionInput.ParamOption +import org.apache.zeppelin.interpreter.{BaseZeppelinContext, InterpreterContext, InterpreterHookRegistry} + +import scala.collection.{JavaConversions, Seq} + + +/** + * ZeppelinContext for Spark + */ +class SparkZeppelinContext(val sc: SparkContext, + val sparkShims: SparkShims, + val hooks2: InterpreterHookRegistry, + val maxResult2: Int) extends BaseZeppelinContext(hooks2, maxResult2) { + + private val interpreterClassMap = Map( + "spark" -> "org.apache.zeppelin.spark.SparkInterpreter", + "sql" -> "org.apache.zeppelin.spark.SparkSqlInterpreter", + "dep" -> "org.apache.zeppelin.spark.DepInterpreter", + "pyspark" -> "org.apache.zeppelin.spark.PySparkInterpreter", + "ipyspark" -> "org.apache.zeppelin.spark.IPySparkInterpreter", + "r" -> "org.apache.zeppelin.spark.SparkRInterpreter" + ) + + private val supportedClasses = scala.collection.mutable.ArrayBuffer[Class[_]]() + + try + supportedClasses += Class.forName("org.apache.spark.sql.Dataset") + catch { + case e: ClassNotFoundException => + } + + try + supportedClasses += Class.forName("org.apache.spark.sql.DataFrame") + catch { + case e: ClassNotFoundException => + + } + if (supportedClasses.isEmpty) throw new RuntimeException("Can not load Dataset/DataFrame class") + + override def getSupportedClasses: util.List[Class[_]] = + JavaConversions.mutableSeqAsJavaList(supportedClasses) + + override def getInterpreterClassMap: util.Map[String, String] = + JavaConversions.mapAsJavaMap(interpreterClassMap) + + override def showData(obj: Any): String = sparkShims.showDataFrame(obj, maxResult) + + @ZeppelinApi + def select(name: String, options: Seq[(Any, String)]): Any = select(name, null, options) + + @ZeppelinApi + def select(name: String, defaultValue: Any, options: Seq[(Any, String)]): Any = + select(name, defaultValue, options.map(e => new ParamOption(e._1, e._2)).toArray) + + @ZeppelinApi + def checkbox(name: String, options: Seq[(AnyRef, String)]): Seq[Any] = { + val javaResult = checkbox(name, JavaConversions.seqAsJavaList(options.map(e => e._1)), + options.map(e => new ParamOption(e._1, e._2)).toArray) + JavaConversions.asScalaBuffer(javaResult) + } + + @ZeppelinApi + def checkbox(name: String, defaultChecked: Seq[AnyRef], options: Seq[(Any, String)]): Seq[Any] = { + val defaultCheckedList = JavaConversions.seqAsJavaList(defaultChecked) + val javaResult = checkbox(name, defaultCheckedList, options.map(e => new ParamOption(e._1, e._2)).toArray) + JavaConversions.asScalaBuffer(javaResult) + } + + @ZeppelinApi + def noteSelect(name: String, options: Seq[(Any, String)]): Any = noteSelect(name, "", options) + + @ZeppelinApi + def noteSelect(name: String, defaultValue: Any, options: Seq[(Any, String)]): AnyRef = + noteSelect(name, defaultValue, options.map(e => new ParamOption(e._1, e._2)).toArray) + + @ZeppelinApi + def noteCheckbox(name: String, options: Seq[(AnyRef, String)]): Seq[AnyRef] = { + val javaResulst =noteCheckbox(name, JavaConversions.seqAsJavaList(options.map(e => e._1)), + options.map(e => new ParamOption(e._1, e._2)).toArray) + JavaConversions.asScalaBuffer(javaResulst) + } + + @ZeppelinApi + def noteCheckbox(name: String, defaultChecked: Seq[AnyRef], options: Seq[(AnyRef, String)]): Seq[AnyRef] = { + val defaultCheckedList = JavaConversions.seqAsJavaList(defaultChecked) + val javaResult = noteCheckbox(name, defaultCheckedList, options.map(e => new ParamOption(e._1, e._2)).toArray) + JavaConversions.asScalaBuffer(javaResult) + } + + @ZeppelinApi def angularWatch(name: String, func: (AnyRef, AnyRef) => Unit): Unit = { + angularWatch(name, interpreterContext.getNoteId, func) + } + + @deprecated def angularWatchGlobal(name: String, func: (AnyRef, AnyRef) => Unit): Unit = { + angularWatch(name, null, func) + } + + @ZeppelinApi def angularWatch(name: String, + func: (AnyRef, AnyRef, InterpreterContext) => Unit): Unit = { + angularWatch(name, interpreterContext.getNoteId, func) + } + + @deprecated def angularWatchGlobal(name: String, + func: (AnyRef, AnyRef, InterpreterContext) => Unit): Unit = { + angularWatch(name, null, func) + } + + private def angularWatch(name: String, noteId: String, func: (AnyRef, AnyRef) => Unit): Unit = { + val w = new AngularObjectWatcher(getInterpreterContext) { + override def watch(oldObject: Any, newObject: AnyRef, context: InterpreterContext): Unit = { + func(newObject, newObject) + } + } + angularWatch(name, noteId, w) + } + + private def angularWatch(name: String, noteId: String, + func: (AnyRef, AnyRef, InterpreterContext) => Unit): Unit = { + val w = new AngularObjectWatcher(getInterpreterContext) { + override def watch(oldObject: AnyRef, newObject: AnyRef, context: InterpreterContext): Unit = { + func(oldObject, newObject, context) + } + } + angularWatch(name, noteId, w) + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2eac6872/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java index 953f09c..6a44f12 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java @@ -33,7 +33,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -117,12 +116,12 @@ public abstract class BaseZeppelinContext { } @ZeppelinApi - public Collection<Object> checkbox(String name, ParamOption[] options) { + public List<Object> checkbox(String name, ParamOption[] options) { return checkbox(name, options, false); } @ZeppelinApi - public Collection<Object> checkbox(String name, List<Object> defaultChecked, + public List<Object> checkbox(String name, List<Object> defaultChecked, ParamOption[] options) { return checkbox(name, defaultChecked, options, false); } @@ -143,12 +142,12 @@ public abstract class BaseZeppelinContext { } @ZeppelinApi - public Collection<Object> noteCheckbox(String name, ParamOption[] options) { + public List<Object> noteCheckbox(String name, ParamOption[] options) { return checkbox(name, options, true); } @ZeppelinApi - public Collection<Object> noteCheckbox(String name, List<Object> defaultChecked, + public List<Object> noteCheckbox(String name, List<Object> defaultChecked, ParamOption[] options) { return checkbox(name, defaultChecked, options, true); } @@ -176,7 +175,7 @@ public abstract class BaseZeppelinContext { } } - private Collection<Object> checkbox(String name, ParamOption[] options, + private List<Object> checkbox(String name, ParamOption[] options, boolean noteForm) { List<Object> defaultValues = new LinkedList<>(); for (ParamOption option : options) { @@ -189,7 +188,7 @@ public abstract class BaseZeppelinContext { } } - private Collection<Object> checkbox(String name, List<Object> defaultChecked, + private List<Object> checkbox(String name, List<Object> defaultChecked, ParamOption[] options, boolean noteForm) { if (noteForm) { return noteGui.checkbox(name, defaultChecked, options); @@ -226,7 +225,7 @@ public abstract class BaseZeppelinContext { public void setMaxResult(int maxResult) { this.maxResult = maxResult; } - + /** * display special types of objects for interpreter. * Each interpreter can has its own supported classes. http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2eac6872/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java index 757c5c9..ef025e6 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java @@ -524,7 +524,7 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi { assertEquals("null", result[1]); assertEquals("1", result[2]); assertEquals("2", result[3]); - assertEquals("items: Seq[Object] = Buffer(2)", result[4]); + assertEquals("items: Seq[Any] = Buffer(2)", result[4]); } @Test