Repository: spark
Updated Branches:
  refs/heads/branch-1.6 11a11f0ff -> 0665fb5ea


[SPARK-11636][SQL] Support classes defined in the REPL with Encoders

#theScaryParts (i.e. changes to the repl, executor classloaders and codegen)...

Author: Michael Armbrust <mich...@databricks.com>
Author: Yin Huai <yh...@databricks.com>

Closes #9825 from marmbrus/dataset-replClasses2.

(cherry picked from commit 4b84c72dfbb9ddb415fee35f69305b5d7b280891)
Signed-off-by: Michael Armbrust <mich...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0665fb5e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0665fb5e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0665fb5e

Branch: refs/heads/branch-1.6
Commit: 0665fb5eae931ee93e320da9fedcfd6649ed004e
Parents: 11a11f0
Author: Michael Armbrust <mich...@databricks.com>
Authored: Fri Nov 20 15:17:17 2015 -0800
Committer: Michael Armbrust <mich...@databricks.com>
Committed: Fri Nov 20 15:17:31 2015 -0800

----------------------------------------------------------------------
 .../org/apache/spark/repl/SparkIMain.scala      | 14 ++++++++----
 .../scala/org/apache/spark/repl/ReplSuite.scala | 24 ++++++++++++++++++++
 .../apache/spark/repl/ExecutorClassLoader.scala |  8 ++++++-
 .../expressions/codegen/CodeGenerator.scala     |  4 ++--
 4 files changed, 43 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0665fb5e/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkIMain.scala
----------------------------------------------------------------------
diff --git 
a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkIMain.scala 
b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkIMain.scala
index 4ee605f..829b122 100644
--- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkIMain.scala
+++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkIMain.scala
@@ -1221,10 +1221,16 @@ import org.apache.spark.annotation.DeveloperApi
         )
       }
 
-      val preamble = """
-        |class %s extends Serializable {
-        |  %s%s%s
-      """.stripMargin.format(lineRep.readName, envLines.map("  " + _ + 
";\n").mkString, importsPreamble, indentCode(toCompute))
+      val preamble = s"""
+        |class ${lineRep.readName} extends Serializable {
+        |  ${envLines.map("  " + _ + ";\n").mkString}
+        |  $importsPreamble
+        |
+        |  // If we need to construct any objects defined in the REPL on an 
executor we will need
+        |  // to pass the outer scope to the appropriate encoder.
+        |  
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)
+        |  ${indentCode(toCompute)}
+      """.stripMargin
       val postamble = importsTrailer + "\n}" + "\n" +
         "object " + lineRep.readName + " {\n" +
         "  val INSTANCE = new " + lineRep.readName + "();\n" +

http://git-wip-us.apache.org/repos/asf/spark/blob/0665fb5e/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala
----------------------------------------------------------------------
diff --git 
a/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala 
b/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index 5674dcd..081aa03 100644
--- a/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -262,6 +262,9 @@ class ReplSuite extends SparkFunSuite {
         |import sqlContext.implicits._
         |case class TestCaseClass(value: Int)
         |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).toDF().collect()
+        |
+        |// Test Dataset Serialization in the REPL
+        |Seq(TestCaseClass(1)).toDS().collect()
       """.stripMargin)
     assertDoesNotContain("error:", output)
     assertDoesNotContain("Exception", output)
@@ -278,6 +281,27 @@ class ReplSuite extends SparkFunSuite {
     assertDoesNotContain("java.lang.ClassNotFoundException", output)
   }
 
+  test("Datasets and encoders") {
+    val output = runInterpreter("local",
+      """
+        |import org.apache.spark.sql.functions._
+        |import org.apache.spark.sql.Encoder
+        |import org.apache.spark.sql.expressions.Aggregator
+        |import org.apache.spark.sql.TypedColumn
+        |val simpleSum = new Aggregator[Int, Int, Int] with Serializable {
+        |  def zero: Int = 0                     // The initial value.
+        |  def reduce(b: Int, a: Int) = b + a    // Add an element to the 
running total
+        |  def merge(b1: Int, b2: Int) = b1 + b2 // Merge intermediate values.
+        |  def finish(b: Int) = b                // Return the final result.
+        |}.toColumn
+        |
+        |val ds = Seq(1, 2, 3, 4).toDS()
+        |ds.select(simpleSum).collect
+      """.stripMargin)
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+  }
+
   test("SPARK-2632 importing a method from non serializable class and not 
using it.") {
     val output = runInterpreter("local",
     """

http://git-wip-us.apache.org/repos/asf/spark/blob/0665fb5e/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
----------------------------------------------------------------------
diff --git 
a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala 
b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
index 3d2d235..a976e96 100644
--- a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
@@ -65,7 +65,13 @@ class ExecutorClassLoader(conf: SparkConf, classUri: String, 
parent: ClassLoader
           case e: ClassNotFoundException => {
             val classOption = findClassLocally(name)
             classOption match {
-              case None => throw new ClassNotFoundException(name, e)
+              case None =>
+                // If this class has a cause, it will break the internal 
assumption of Janino
+                // (the compiler used for Spark SQL code-gen).
+                // See org.codehaus.janino.ClassLoaderIClassLoader's 
findIClass, you will see
+                // its behavior will be changed if there is a cause and the 
compilation
+                // of generated class will fail.
+                throw new ClassNotFoundException(name)
               case Some(a) => a
             }
           }

http://git-wip-us.apache.org/repos/asf/spark/blob/0665fb5e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index 1b7260c..2f3d6ae 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.util.{MapData, ArrayData}
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.Platform
 import org.apache.spark.unsafe.types._
-
+import org.apache.spark.util.Utils
 
 /**
  * Java source for evaluating an [[Expression]] given a [[InternalRow]] of 
input.
@@ -536,7 +536,7 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: 
AnyRef] extends Loggin
    */
   private[this] def doCompile(code: String): GeneratedClass = {
     val evaluator = new ClassBodyEvaluator()
-    evaluator.setParentClassLoader(getClass.getClassLoader)
+    evaluator.setParentClassLoader(Utils.getContextOrSparkClassLoader)
     // Cannot be under package codegen, or fail with 
java.lang.InstantiationException
     
evaluator.setClassName("org.apache.spark.sql.catalyst.expressions.GeneratedClass")
     evaluator.setDefaultImports(Array(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to