Repository: zeppelin
Updated Branches:
  refs/heads/master ac2e957e2 -> 2eac6872e


ZEPPELIN-3678. Change SparkZeppelinContext to scala implementation

### What is this PR for?

This PR just to change SparkZeppelinContext to scala implementation, so that it 
is easy for us to implement SparkZeppelinContext via scala instead of java. 
This also make SparkZeppelinContext more readable and maintainable.

### What type of PR is it?
[Refactoring]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-3678

### How should this be tested?
* CI pass

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <zjf...@apache.org>

Closes #3108 from zjffdu/ZEPPELIN-3678 and squashes the following commits:

bc3feafc1 [Jeff Zhang] ZEPPELIN-3678. Change SparkZeppelinContext to scala 
implementation


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/2eac6872
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/2eac6872
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/2eac6872

Branch: refs/heads/master
Commit: 2eac6872e22c6f63c0437d78530772357864d4bb
Parents: ac2e957
Author: Jeff Zhang <zjf...@apache.org>
Authored: Thu Aug 2 10:03:58 2018 +0800
Committer: Jeff Zhang <zjf...@apache.org>
Committed: Fri Aug 3 10:25:38 2018 +0800

----------------------------------------------------------------------
 .../zeppelin/spark/SparkZeppelinContext.java    | 233 -------------------
 .../zeppelin/spark/SparkZeppelinContext.scala   | 150 ++++++++++++
 .../interpreter/BaseZeppelinContext.java        |  15 +-
 .../zeppelin/rest/ZeppelinSparkClusterTest.java |   2 +-
 4 files changed, 158 insertions(+), 242 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2eac6872/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java
----------------------------------------------------------------------
diff --git 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java
 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java
deleted file mode 100644
index 87d5b16..0000000
--- 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.spark;
-
-import com.google.common.collect.Lists;
-import org.apache.spark.SparkContext;
-import org.apache.zeppelin.annotation.ZeppelinApi;
-import org.apache.zeppelin.display.AngularObjectWatcher;
-import org.apache.zeppelin.display.ui.OptionInput;
-import org.apache.zeppelin.interpreter.BaseZeppelinContext;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterHookRegistry;
-import scala.Tuple2;
-import scala.Unit;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import static scala.collection.JavaConversions.asJavaIterable;
-import static scala.collection.JavaConversions.collectionAsScalaIterable;
-
-
-/**
- * ZeppelinContext for Spark
- */
-public class SparkZeppelinContext extends BaseZeppelinContext {
-
-  private SparkContext sc;
-  private List<Class> supportedClasses;
-  private Map<String, String> interpreterClassMap;
-  private SparkShims sparkShims;
-
-  public SparkZeppelinContext(
-      SparkContext sc,
-      SparkShims sparkShims,
-      InterpreterHookRegistry hooks,
-      int maxResult) {
-    super(hooks, maxResult);
-    this.sc = sc;
-    this.sparkShims = sparkShims;
-    interpreterClassMap = new HashMap();
-    interpreterClassMap.put("spark", 
"org.apache.zeppelin.spark.SparkInterpreter");
-    interpreterClassMap.put("sql", 
"org.apache.zeppelin.spark.SparkSqlInterpreter");
-    interpreterClassMap.put("dep", "org.apache.zeppelin.spark.DepInterpreter");
-    interpreterClassMap.put("pyspark", 
"org.apache.zeppelin.spark.PySparkInterpreter");
-
-    this.supportedClasses = new ArrayList<>();
-    try {
-      
supportedClasses.add(this.getClass().forName("org.apache.spark.sql.Dataset"));
-    } catch (ClassNotFoundException e) {
-    }
-
-    try {
-      
supportedClasses.add(this.getClass().forName("org.apache.spark.sql.DataFrame"));
-    } catch (ClassNotFoundException e) {
-    }
-
-    if (supportedClasses.isEmpty()) {
-      throw new RuntimeException("Can not load Dataset/DataFrame class");
-    }
-  }
-
-  @Override
-  public List<Class> getSupportedClasses() {
-    return supportedClasses;
-  }
-
-  @Override
-  public Map<String, String> getInterpreterClassMap() {
-    return interpreterClassMap;
-  }
-
-  @Override
-  public String showData(Object obj) {
-    return sparkShims.showDataFrame(obj, maxResult);
-  }
-
-  @ZeppelinApi
-  public Object select(String name, scala.collection.Iterable<Tuple2<Object, 
String>> options) {
-    return select(name, null, options);
-  }
-
-  @ZeppelinApi
-  public Object select(String name, Object defaultValue,
-                       scala.collection.Iterable<Tuple2<Object, String>> 
options) {
-    return select(name, defaultValue, tuplesToParamOptions(options));
-  }
-
-  @ZeppelinApi
-  public scala.collection.Seq<Object> checkbox(
-      String name,
-      scala.collection.Iterable<Tuple2<Object, String>> options) {
-    List<Object> allChecked = new LinkedList<>();
-    for (Tuple2<Object, String> option : asJavaIterable(options)) {
-      allChecked.add(option._1());
-    }
-    return checkbox(name, collectionAsScalaIterable(allChecked), options);
-  }
-
-  @ZeppelinApi
-  public scala.collection.Seq<Object> checkbox(
-      String name,
-      scala.collection.Iterable<Object> defaultChecked,
-      scala.collection.Iterable<Tuple2<Object, String>> options) {
-    List<Object> defaultCheckedList = 
Lists.newArrayList(asJavaIterable(defaultChecked).iterator());
-    Collection<Object> checkbox = checkbox(name, defaultCheckedList, 
tuplesToParamOptions(options));
-    List<Object> checkboxList = Arrays.asList(checkbox.toArray());
-    return 
scala.collection.JavaConversions.asScalaBuffer(checkboxList).toSeq();
-  }
-
-  @ZeppelinApi
-  public Object noteSelect(String name, 
scala.collection.Iterable<Tuple2<Object, String>> options) {
-    return noteSelect(name, "", options);
-  }
-
-  @ZeppelinApi
-  public Object noteSelect(String name, Object defaultValue,
-                           scala.collection.Iterable<Tuple2<Object, String>> 
options) {
-    return noteSelect(name, defaultValue, tuplesToParamOptions(options));
-  }
-
-  @ZeppelinApi
-  public scala.collection.Seq<Object> noteCheckbox(
-      String name,
-      scala.collection.Iterable<Tuple2<Object, String>> options) {
-    List<Object> allChecked = new LinkedList<>();
-    for (Tuple2<Object, String> option : asJavaIterable(options)) {
-      allChecked.add(option._1());
-    }
-    return noteCheckbox(name, collectionAsScalaIterable(allChecked), options);
-  }
-
-  @ZeppelinApi
-  public scala.collection.Seq<Object> noteCheckbox(
-      String name,
-      scala.collection.Iterable<Object> defaultChecked,
-      scala.collection.Iterable<Tuple2<Object, String>> options) {
-    List<Object> defaultCheckedList = 
Lists.newArrayList(asJavaIterable(defaultChecked).iterator());
-    Collection<Object> checkbox = noteCheckbox(name, defaultCheckedList,
-        tuplesToParamOptions(options));
-    List<Object> checkboxList = Arrays.asList(checkbox.toArray());
-    return 
scala.collection.JavaConversions.asScalaBuffer(checkboxList).toSeq();
-  }
-
-  private OptionInput.ParamOption[] tuplesToParamOptions(
-      scala.collection.Iterable<Tuple2<Object, String>> options) {
-    int n = options.size();
-    OptionInput.ParamOption[] paramOptions = new OptionInput.ParamOption[n];
-    Iterator<Tuple2<Object, String>> it = asJavaIterable(options).iterator();
-
-    int i = 0;
-    while (it.hasNext()) {
-      Tuple2<Object, String> valueAndDisplayValue = it.next();
-      paramOptions[i++] = new 
OptionInput.ParamOption(valueAndDisplayValue._1(),
-          valueAndDisplayValue._2());
-    }
-
-    return paramOptions;
-  }
-
-  @ZeppelinApi
-  public void angularWatch(String name,
-                           final scala.Function2<Object, Object, Unit> func) {
-    angularWatch(name, interpreterContext.getNoteId(), func);
-  }
-
-  @Deprecated
-  public void angularWatchGlobal(String name,
-                                 final scala.Function2<Object, Object, Unit> 
func) {
-    angularWatch(name, null, func);
-  }
-
-  @ZeppelinApi
-  public void angularWatch(
-      String name,
-      final scala.Function3<Object, Object, InterpreterContext, Unit> func) {
-    angularWatch(name, interpreterContext.getNoteId(), func);
-  }
-
-  @Deprecated
-  public void angularWatchGlobal(
-      String name,
-      final scala.Function3<Object, Object, InterpreterContext, Unit> func) {
-    angularWatch(name, null, func);
-  }
-
-  private void angularWatch(String name, String noteId,
-                            final scala.Function2<Object, Object, Unit> func) {
-    AngularObjectWatcher w = new AngularObjectWatcher(getInterpreterContext()) 
{
-      @Override
-      public void watch(Object oldObject, Object newObject,
-                        InterpreterContext context) {
-        func.apply(newObject, newObject);
-      }
-    };
-    angularWatch(name, noteId, w);
-  }
-
-  private void angularWatch(
-      String name,
-      String noteId,
-      final scala.Function3<Object, Object, InterpreterContext, Unit> func) {
-    AngularObjectWatcher w = new AngularObjectWatcher(getInterpreterContext()) 
{
-      @Override
-      public void watch(Object oldObject, Object newObject,
-                        InterpreterContext context) {
-        func.apply(oldObject, newObject, context);
-      }
-    };
-    angularWatch(name, noteId, w);
-  }
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2eac6872/spark/interpreter/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
----------------------------------------------------------------------
diff --git 
a/spark/interpreter/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
 
b/spark/interpreter/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
new file mode 100644
index 0000000..0e10d84
--- /dev/null
+++ 
b/spark/interpreter/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.spark
+
+import java.util
+
+import org.apache.spark.SparkContext
+import org.apache.zeppelin.annotation.ZeppelinApi
+import org.apache.zeppelin.display.AngularObjectWatcher
+import org.apache.zeppelin.display.ui.OptionInput.ParamOption
+import org.apache.zeppelin.interpreter.{BaseZeppelinContext, 
InterpreterContext, InterpreterHookRegistry}
+
+import scala.collection.{JavaConversions, Seq}
+
+
+/**
+  * ZeppelinContext for Spark
+  */
+class SparkZeppelinContext(val sc: SparkContext,
+                           val sparkShims: SparkShims,
+                           val hooks2: InterpreterHookRegistry,
+                           val maxResult2: Int) extends 
BaseZeppelinContext(hooks2, maxResult2) {
+
+  private val interpreterClassMap = Map(
+    "spark" -> "org.apache.zeppelin.spark.SparkInterpreter",
+    "sql" -> "org.apache.zeppelin.spark.SparkSqlInterpreter",
+    "dep" -> "org.apache.zeppelin.spark.DepInterpreter",
+    "pyspark" -> "org.apache.zeppelin.spark.PySparkInterpreter",
+    "ipyspark" -> "org.apache.zeppelin.spark.IPySparkInterpreter",
+    "r" -> "org.apache.zeppelin.spark.SparkRInterpreter"
+  )
+
+  private val supportedClasses = 
scala.collection.mutable.ArrayBuffer[Class[_]]()
+
+  try
+    supportedClasses += Class.forName("org.apache.spark.sql.Dataset")
+  catch {
+    case e: ClassNotFoundException =>
+  }
+
+  try
+    supportedClasses += Class.forName("org.apache.spark.sql.DataFrame")
+  catch {
+    case e: ClassNotFoundException =>
+
+  }
+  if (supportedClasses.isEmpty) throw new RuntimeException("Can not load 
Dataset/DataFrame class")
+
+  override def getSupportedClasses: util.List[Class[_]] =
+    JavaConversions.mutableSeqAsJavaList(supportedClasses)
+
+  override def getInterpreterClassMap: util.Map[String, String] =
+    JavaConversions.mapAsJavaMap(interpreterClassMap)
+
+  override def showData(obj: Any): String = sparkShims.showDataFrame(obj, 
maxResult)
+
+  @ZeppelinApi
+  def select(name: String, options: Seq[(Any, String)]): Any = select(name, 
null, options)
+
+  @ZeppelinApi
+  def select(name: String, defaultValue: Any, options: Seq[(Any, String)]): 
Any =
+    select(name, defaultValue, options.map(e => new ParamOption(e._1, 
e._2)).toArray)
+
+  @ZeppelinApi
+  def checkbox(name: String, options: Seq[(AnyRef, String)]): Seq[Any] = {
+    val javaResult = checkbox(name, 
JavaConversions.seqAsJavaList(options.map(e => e._1)),
+      options.map(e => new ParamOption(e._1, e._2)).toArray)
+    JavaConversions.asScalaBuffer(javaResult)
+  }
+
+  @ZeppelinApi
+  def checkbox(name: String, defaultChecked: Seq[AnyRef], options: Seq[(Any, 
String)]): Seq[Any] = {
+    val defaultCheckedList = JavaConversions.seqAsJavaList(defaultChecked)
+    val javaResult = checkbox(name, defaultCheckedList, options.map(e => new 
ParamOption(e._1, e._2)).toArray)
+    JavaConversions.asScalaBuffer(javaResult)
+  }
+
+  @ZeppelinApi
+  def noteSelect(name: String, options: Seq[(Any, String)]): Any = 
noteSelect(name, "", options)
+
+  @ZeppelinApi
+  def noteSelect(name: String, defaultValue: Any, options: Seq[(Any, 
String)]): AnyRef =
+    noteSelect(name, defaultValue, options.map(e => new ParamOption(e._1, 
e._2)).toArray)
+
+  @ZeppelinApi
+  def noteCheckbox(name: String, options: Seq[(AnyRef, String)]): Seq[AnyRef] 
= {
+    val javaResulst =noteCheckbox(name, 
JavaConversions.seqAsJavaList(options.map(e => e._1)),
+      options.map(e => new ParamOption(e._1, e._2)).toArray)
+    JavaConversions.asScalaBuffer(javaResulst)
+  }
+
+  @ZeppelinApi
+  def noteCheckbox(name: String, defaultChecked: Seq[AnyRef], options: 
Seq[(AnyRef, String)]): Seq[AnyRef] = {
+    val defaultCheckedList = JavaConversions.seqAsJavaList(defaultChecked)
+    val javaResult = noteCheckbox(name, defaultCheckedList, options.map(e => 
new ParamOption(e._1, e._2)).toArray)
+    JavaConversions.asScalaBuffer(javaResult)
+  }
+
+  @ZeppelinApi def angularWatch(name: String, func: (AnyRef, AnyRef) => Unit): 
Unit = {
+    angularWatch(name, interpreterContext.getNoteId, func)
+  }
+
+  @deprecated def angularWatchGlobal(name: String, func: (AnyRef, AnyRef) => 
Unit): Unit = {
+    angularWatch(name, null, func)
+  }
+
+  @ZeppelinApi def angularWatch(name: String,
+                                func: (AnyRef, AnyRef, InterpreterContext) => 
Unit): Unit = {
+    angularWatch(name, interpreterContext.getNoteId, func)
+  }
+
+  @deprecated def angularWatchGlobal(name: String,
+                                     func: (AnyRef, AnyRef, 
InterpreterContext) => Unit): Unit = {
+    angularWatch(name, null, func)
+  }
+
+  private def angularWatch(name: String, noteId: String, func: (AnyRef, 
AnyRef) => Unit): Unit = {
+    val w = new AngularObjectWatcher(getInterpreterContext) {
+      override def watch(oldObject: Any, newObject: AnyRef, context: 
InterpreterContext): Unit = {
+        func(newObject, newObject)
+      }
+    }
+    angularWatch(name, noteId, w)
+  }
+
+  private def angularWatch(name: String, noteId: String,
+                           func: (AnyRef, AnyRef, InterpreterContext) => 
Unit): Unit = {
+    val w = new AngularObjectWatcher(getInterpreterContext) {
+      override def watch(oldObject: AnyRef, newObject: AnyRef, context: 
InterpreterContext): Unit = {
+        func(oldObject, newObject, context)
+      }
+    }
+    angularWatch(name, noteId, w)
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2eac6872/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java
index 953f09c..6a44f12 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java
@@ -33,7 +33,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -117,12 +116,12 @@ public abstract class BaseZeppelinContext {
   }
 
   @ZeppelinApi
-  public Collection<Object> checkbox(String name, ParamOption[] options) {
+  public List<Object> checkbox(String name, ParamOption[] options) {
     return checkbox(name, options, false);
   }
 
   @ZeppelinApi
-  public Collection<Object> checkbox(String name, List<Object> defaultChecked,
+  public List<Object> checkbox(String name, List<Object> defaultChecked,
                                      ParamOption[] options) {
     return checkbox(name, defaultChecked, options, false);
   }
@@ -143,12 +142,12 @@ public abstract class BaseZeppelinContext {
   }
 
   @ZeppelinApi
-  public Collection<Object> noteCheckbox(String name, ParamOption[] options) {
+  public List<Object> noteCheckbox(String name, ParamOption[] options) {
     return checkbox(name, options, true);
   }
 
   @ZeppelinApi
-  public Collection<Object> noteCheckbox(String name, List<Object> 
defaultChecked,
+  public List<Object> noteCheckbox(String name, List<Object> defaultChecked,
                                          ParamOption[] options) {
     return checkbox(name, defaultChecked, options, true);
   }
@@ -176,7 +175,7 @@ public abstract class BaseZeppelinContext {
     }
   }
 
-  private Collection<Object> checkbox(String name, ParamOption[] options,
+  private List<Object> checkbox(String name, ParamOption[] options,
                                       boolean noteForm) {
     List<Object> defaultValues = new LinkedList<>();
     for (ParamOption option : options) {
@@ -189,7 +188,7 @@ public abstract class BaseZeppelinContext {
     }
   }
 
-  private Collection<Object> checkbox(String name, List<Object> defaultChecked,
+  private List<Object> checkbox(String name, List<Object> defaultChecked,
                                       ParamOption[] options, boolean noteForm) 
{
     if (noteForm) {
       return noteGui.checkbox(name, defaultChecked, options);
@@ -226,7 +225,7 @@ public abstract class BaseZeppelinContext {
   public void setMaxResult(int maxResult) {
     this.maxResult = maxResult;
   }
-
+  
   /**
    * display special types of objects for interpreter.
    * Each interpreter can has its own supported classes.

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2eac6872/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
 
b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
index 757c5c9..ef025e6 100644
--- 
a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
+++ 
b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
@@ -524,7 +524,7 @@ public class ZeppelinSparkClusterTest extends 
AbstractTestRestApi {
     assertEquals("null", result[1]);
     assertEquals("1", result[2]);
     assertEquals("2", result[3]);
-    assertEquals("items: Seq[Object] = Buffer(2)", result[4]);
+    assertEquals("items: Seq[Any] = Buffer(2)", result[4]);
   }
 
   @Test

Reply via email to