Repository: incubator-zeppelin
Updated Branches:
refs/heads/master bd658e7c3 -> 23922ae48
Add Scala utility functions for display
Until now, to display data as a table, there are 2 alternatives:
1. Either use **Spark DataFrame** and Zeppeline built-in support
2. Or generate manually a `println(%table ...)`. As an example of displaying an
`RDD[(String,String,Int)]` representing a collection of users:
```scala
val data = new java.lang.StringBuilder("%table Login\tName\tAge\n")
rdd.foreach {
case (login,name,age) => data.append(s"$login\t$name\t$age\n")
}
println(data.toString())
```
My proposal is to add a new utility function to make creating tables easier
that the code example above. Of course one can always use **Spark DataFrame**
but I find it quite restrictive. People using Spark versions lesser than 1.3
cannot rely on DataFrame and sometimes one does not want to transform an RDD to
DataFrame for display.
How are the utility functions implemented ?
1. I added a new module **spark-utils** which provide Scala code for display
utility functions. This module will use the **maven-scala-plugin** to compile
all the classes in package `org.apache.zeppelin.spark.utils`.
2. Right now the package `org.apache.zeppelin.spark.utils` only contains 1
object `DisplayUtils` which augments RDDs/**Scala Traversable** of Tuples or
case classes (all of them sub-class of trait `Product`) with the new method
`displayAsTable(columnLabels: String*)`.
3. The `DisplayUtils` object is imported automatically into the
`SparkInterpreter` with `intp.interpret("import
org.apache.zeppelin.spark.utils.DisplayUtils._");`
4. The Maven module **interpreter** will now have a **runtime** dependency on
the module **spark-utils** so that the utility class will be loaded at runtime
5. Usage of the new display utility function is:
**Paragraph1**
```scala
case class Person(login: String, name: String, age: Int)
val rddTuples:RDD[(String,String,Int)] = sc.parallelize(List(("jdoe","John
DOE",32),("hsue","Helen SUE",27))
val rddCaseClass:RDD[Person] = sc.parallelize(List(Person("jdoe","John
DOE",32),Person("hsue","Helen SUE",27))
```
**Paragraph2**
```scala
rddTuples.displayAsTable("Login","Name","Age")
```
**Paragraph3**
```scala
rddCaseClass.displayAsTable("Login","Name","Age")
```
6. The `displayAsTable()` method is error-proof, meaning that if the user
provides **more** columns label that the number of elements in the tuples/case
class, the extra column labels will ignored. If the user provides **less**
column labels than expected, the method will pad missing column headers with
**Column2**, **Column3** etc ...
7. In addition to the `displayAsTable` methods, I added some other utility
methods to make it easier to handle custom HTML and images:
a. calling `html()` will generate the string `"%html "`
b. calling `html("<p> This is a test</p>)` will generate the string `"%html
<p> This is a test</p>"`
c. calling `img("http://www.google.com")` will generate the string `"<img
src='http://www.google.com' />"`
d. calling `img64()` will generate the string `"%img "`
e. calling `img64("ABCDE123")` will generate the string `"%img ABCDE123"`
Of course the `DisplayUtils` object can be extended with new other functions to
support future advanced displaying features
Author: DuyHai DOAN <[email protected]>
Closes #80 from doanduyhai/DisplayUtils and squashes the following commits:
62a2311 [DuyHai DOAN] Add default limit to RDD using zeppelin.spark.maxResult
property
47a1b1f [DuyHai DOAN] Rename displayAsTable() to display()
a15294e [DuyHai DOAN] Enhance display function utility to accept Scala
Traversable in addition to RDD
c1ee8fe [DuyHai DOAN] Add Scala code in module spark to expose utility
functions for display
Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit:
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/23922ae4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/23922ae4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/23922ae4
Branch: refs/heads/master
Commit: 23922ae488fc63b50d87524421cd93c3dba3024c
Parents: bd658e7
Author: DuyHai DOAN <[email protected]>
Authored: Fri Jun 12 13:35:48 2015 +0200
Committer: Lee moon soo <[email protected]>
Committed: Thu Jun 18 21:14:40 2015 -0700
----------------------------------------------------------------------
spark/pom.xml | 36 ++++
.../apache/zeppelin/spark/SparkInterpreter.java | 8 +
.../zeppelin/spark/utils/DisplayUtils.scala | 90 ++++++++++
.../spark/utils/DisplayFunctionsTest.scala | 173 +++++++++++++++++++
zeppelin-interpreter/pom.xml | 1 +
5 files changed, 308 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/23922ae4/spark/pom.xml
----------------------------------------------------------------------
diff --git a/spark/pom.xml b/spark/pom.xml
index 4610167..4b82ac1 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -381,6 +381,14 @@
<version>1.1</version>
</dependency>
+ <!--TEST-->
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.binary.version}</artifactId>
+ <version>2.2.4</version>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
@@ -778,6 +786,34 @@
</executions>
</plugin>
+ <!-- Plugin to compile Scala code -->
+ <plugin>
+ <groupId>org.scala-tools</groupId>
+ <artifactId>maven-scala-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>compile</id>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ <phase>compile</phase>
+ </execution>
+ <execution>
+ <id>test-compile</id>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ <phase>test-compile</phase>
+ </execution>
+ <execution>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
</plugins>
</build>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/23922ae4/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
----------------------------------------------------------------------
diff --git
a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
index 935b2a5..1c4c5e7 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
@@ -461,6 +461,14 @@ public class SparkInterpreter extends Interpreter {
intp.interpret("import org.apache.spark.sql.functions._");
}
+ // Utility functions for display
+ intp.interpret("import org.apache.zeppelin.spark.utils.DisplayUtils._");
+
+ // Scala implicit value for spark.maxResult
+ intp.interpret("import org.apache.zeppelin.spark.utils.SparkMaxResult");
+ intp.interpret("implicit val sparkMaxResult = new SparkMaxResult(" +
+ Integer.parseInt(getProperty("zeppelin.spark.maxResult")) + ")");
+
// add jar
if (depInterpreter != null) {
DependencyContext depc = depInterpreter.getDependencyContext();
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/23922ae4/spark/src/main/scala/org/apache/zeppelin/spark/utils/DisplayUtils.scala
----------------------------------------------------------------------
diff --git
a/spark/src/main/scala/org/apache/zeppelin/spark/utils/DisplayUtils.scala
b/spark/src/main/scala/org/apache/zeppelin/spark/utils/DisplayUtils.scala
new file mode 100644
index 0000000..8181434
--- /dev/null
+++ b/spark/src/main/scala/org/apache/zeppelin/spark/utils/DisplayUtils.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.spark.utils
+
+import java.lang.StringBuilder
+
+import org.apache.spark.rdd.RDD
+
+import scala.collection.IterableLike
+
+object DisplayUtils {
+
+ implicit def toDisplayRDDFunctions[T <: Product](rdd: RDD[T]):
DisplayRDDFunctions[T] = new DisplayRDDFunctions[T](rdd)
+
+ implicit def toDisplayTraversableFunctions[T <: Product](traversable:
Traversable[T]): DisplayTraversableFunctions[T] = new
DisplayTraversableFunctions[T](traversable)
+
+ def html(htmlContent: String = "") = s"%html $htmlContent"
+
+ def img64(base64Content: String = "") = s"%img $base64Content"
+
+ def img(url: String) = s"<img src='$url' />"
+}
+
+trait DisplayCollection[T <: Product] {
+
+ def printFormattedData(traversable: Traversable[T], columnLabels: String*):
Unit = {
+ val providedLabelCount: Int = columnLabels.size
+ var maxColumnCount:Int = 1
+ val headers = new StringBuilder("%table ")
+
+ val data = new StringBuilder("")
+
+ traversable.foreach(tuple => {
+ maxColumnCount = math.max(maxColumnCount,tuple.productArity)
+ data.append(tuple.productIterator.mkString("\t")).append("\n")
+ })
+
+ if (providedLabelCount > maxColumnCount) {
+
headers.append(columnLabels.take(maxColumnCount).mkString("\t")).append("\n")
+ } else if (providedLabelCount < maxColumnCount) {
+ val missingColumnHeaders = ((providedLabelCount+1) to
maxColumnCount).foldLeft[String](""){
+ (stringAccumulator,index) => if (index==1) s"Column$index" else
s"$stringAccumulator\tColumn$index"
+ }
+
+
headers.append(columnLabels.mkString("\t")).append(missingColumnHeaders).append("\n")
+ } else {
+ headers.append(columnLabels.mkString("\t")).append("\n")
+ }
+
+ headers.append(data)
+
+ print(headers.toString)
+ }
+
+}
+
+class DisplayRDDFunctions[T <: Product] (val rdd: RDD[T]) extends
DisplayCollection[T] {
+
+ def display(columnLabels: String*)(implicit sparkMaxResult: SparkMaxResult):
Unit = {
+ printFormattedData(rdd.take(sparkMaxResult.maxResult), columnLabels: _*)
+ }
+
+ def display(sparkMaxResult:Int, columnLabels: String*): Unit = {
+ printFormattedData(rdd.take(sparkMaxResult), columnLabels: _*)
+ }
+}
+
+class DisplayTraversableFunctions[T <: Product] (val traversable:
Traversable[T]) extends DisplayCollection[T] {
+
+ def display(columnLabels: String*): Unit = {
+ printFormattedData(traversable, columnLabels: _*)
+ }
+}
+
+class SparkMaxResult(val maxResult: Int) extends Serializable
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/23922ae4/spark/src/test/scala/org/apache/zeppelin/spark/utils/DisplayFunctionsTest.scala
----------------------------------------------------------------------
diff --git
a/spark/src/test/scala/org/apache/zeppelin/spark/utils/DisplayFunctionsTest.scala
b/spark/src/test/scala/org/apache/zeppelin/spark/utils/DisplayFunctionsTest.scala
new file mode 100644
index 0000000..2638f17
--- /dev/null
+++
b/spark/src/test/scala/org/apache/zeppelin/spark/utils/DisplayFunctionsTest.scala
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.spark.utils
+
+import java.io.ByteArrayOutputStream
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{SparkContext, SparkConf}
+import org.scalatest._
+import org.scalatest.{BeforeAndAfter}
+
+case class Person(login : String, name: String, age: Int)
+
+class DisplayFunctionsTest extends FlatSpec with BeforeAndAfter with
BeforeAndAfterEach with Matchers {
+ var sc: SparkContext = null
+ var testTuples:List[(String, String, Int)] = null
+ var testPersons:List[Person] = null
+ var testRDDTuples: RDD[(String,String,Int)] = null
+ var testRDDPersons: RDD[Person] = null
+ var stream: ByteArrayOutputStream = null
+
+ before {
+ val sparkConf: SparkConf = new SparkConf(true)
+ .setAppName("test-DisplayFunctions")
+ .setMaster("local")
+ sc = new SparkContext(sparkConf)
+ testTuples = List(("jdoe", "John DOE", 32), ("hsue", "Helen SUE", 27),
("rsmith", "Richard SMITH", 45))
+ testRDDTuples = sc.parallelize(testTuples)
+ testPersons = List(Person("jdoe", "John DOE", 32), Person("hsue", "Helen
SUE", 27), Person("rsmith", "Richard SMITH", 45))
+ testRDDPersons = sc.parallelize(testPersons)
+ }
+
+ override def beforeEach() {
+ stream = new java.io.ByteArrayOutputStream()
+ super.beforeEach() // To be stackable, must call super.beforeEach
+ }
+
+
+ "DisplayFunctions" should "generate correct column headers for tuples" in {
+ implicit val sparkMaxResult = new SparkMaxResult(100)
+ Console.withOut(stream) {
+ new
DisplayRDDFunctions[(String,String,Int)](testRDDTuples).display("Login","Name","Age")
+ }
+
+ stream.toString("UTF-8") should be("%table Login\tName\tAge\n" +
+ "jdoe\tJohn DOE\t32\n" +
+ "hsue\tHelen SUE\t27\n" +
+ "rsmith\tRichard SMITH\t45\n")
+ }
+
+ "DisplayFunctions" should "generate correct column headers for case class"
in {
+ implicit val sparkMaxResult = new SparkMaxResult(100)
+ Console.withOut(stream) {
+ new
DisplayRDDFunctions[Person](testRDDPersons).display("Login","Name","Age")
+ }
+
+ stream.toString("UTF-8") should be("%table Login\tName\tAge\n" +
+ "jdoe\tJohn DOE\t32\n" +
+ "hsue\tHelen SUE\t27\n" +
+ "rsmith\tRichard SMITH\t45\n")
+ }
+
+ "DisplayFunctions" should "truncate exceeding column headers for tuples" in {
+ implicit val sparkMaxResult = new SparkMaxResult(100)
+ Console.withOut(stream) {
+ new
DisplayRDDFunctions[(String,String,Int)](testRDDTuples).display("Login","Name","Age","xxx","yyy")
+ }
+
+ stream.toString("UTF-8") should be("%table Login\tName\tAge\n" +
+ "jdoe\tJohn DOE\t32\n" +
+ "hsue\tHelen SUE\t27\n" +
+ "rsmith\tRichard SMITH\t45\n")
+ }
+
+ "DisplayFunctions" should "pad missing column headers with ColumnXXX for
tuples" in {
+ implicit val sparkMaxResult = new SparkMaxResult(100)
+ Console.withOut(stream) {
+ new
DisplayRDDFunctions[(String,String,Int)](testRDDTuples).display("Login")
+ }
+
+ stream.toString("UTF-8") should be("%table Login\tColumn2\tColumn3\n" +
+ "jdoe\tJohn DOE\t32\n" +
+ "hsue\tHelen SUE\t27\n" +
+ "rsmith\tRichard SMITH\t45\n")
+ }
+
+ "DisplayUtils" should "restricts RDD to sparkMaxresult with implicit limit"
in {
+
+ implicit val sparkMaxResult = new SparkMaxResult(2)
+
+ Console.withOut(stream) {
+ new
DisplayRDDFunctions[(String,String,Int)](testRDDTuples).display("Login")
+ }
+
+ stream.toString("UTF-8") should be("%table Login\tColumn2\tColumn3\n" +
+ "jdoe\tJohn DOE\t32\n" +
+ "hsue\tHelen SUE\t27\n")
+ }
+
+ "DisplayUtils" should "restricts RDD to sparkMaxresult with explicit limit"
in {
+
+ implicit val sparkMaxResult = new SparkMaxResult(2)
+
+ Console.withOut(stream) {
+ new
DisplayRDDFunctions[(String,String,Int)](testRDDTuples).display(1,"Login")
+ }
+
+ stream.toString("UTF-8") should be("%table Login\tColumn2\tColumn3\n" +
+ "jdoe\tJohn DOE\t32\n")
+ }
+
+ "DisplayFunctions" should "display traversable of tuples" in {
+
+ Console.withOut(stream) {
+ new
DisplayTraversableFunctions[(String,String,Int)](testTuples).display("Login","Name","Age")
+ }
+
+ stream.toString("UTF-8") should be("%table Login\tName\tAge\n" +
+ "jdoe\tJohn DOE\t32\n" +
+ "hsue\tHelen SUE\t27\n" +
+ "rsmith\tRichard SMITH\t45\n")
+ }
+
+ "DisplayFunctions" should "display traversable of case class" in {
+
+ Console.withOut(stream) {
+ new
DisplayTraversableFunctions[Person](testPersons).display("Login","Name","Age")
+ }
+
+ stream.toString("UTF-8") should be("%table Login\tName\tAge\n" +
+ "jdoe\tJohn DOE\t32\n" +
+ "hsue\tHelen SUE\t27\n" +
+ "rsmith\tRichard SMITH\t45\n")
+ }
+
+ "DisplayUtils" should "display HTML" in {
+ DisplayUtils.html() should be ("%html ")
+ DisplayUtils.html("test") should be ("%html test")
+ }
+
+ "DisplayUtils" should "display img" in {
+ DisplayUtils.img("http://www.google.com") should be ("<img
src='http://www.google.com' />")
+ DisplayUtils.img64() should be ("%img ")
+ DisplayUtils.img64("abcde") should be ("%img abcde")
+ }
+
+ override def afterEach() {
+ try super.afterEach() // To be stackable, must call super.afterEach
+ stream = null
+ }
+
+ after {
+ sc.stop()
+ }
+
+
+}
+
+
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/23922ae4/zeppelin-interpreter/pom.xml
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/pom.xml b/zeppelin-interpreter/pom.xml
index 980fe4a..c2a6752 100644
--- a/zeppelin-interpreter/pom.xml
+++ b/zeppelin-interpreter/pom.xml
@@ -36,6 +36,7 @@
<url>http://zeppelin.incubator.apache.org</url>
<dependencies>
+
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>