Repository: spark
Updated Branches:
  refs/heads/master 7c07d176f -> 09b3c56c9


[SPARK-14752][SQL] Explicitly implement KryoSerialization for 
LazilyGenerateOrdering

## What changes were proposed in this pull request?

This patch fixes a number of `com.esotericsoftware.kryo.KryoException: 
java.lang.NullPointerException` exceptions reported in [SPARK-15604], 
[SPARK-14752] etc. (while executing sparkSQL queries with the kryo serializer) 
by explicitly implementing `KryoSerialization` for `LazilyGenerateOrdering`.

## How was this patch tested?

1. Modified `OrderingSuite` so that all tests in the suite also test kryo 
serialization (for both interpreted and generated ordering).
2. Manually verified TPC-DS q1.

Author: Sameer Agarwal <sam...@databricks.com>

Closes #13466 from sameeragarwal/kryo.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/09b3c56c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/09b3c56c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/09b3c56c

Branch: refs/heads/master
Commit: 09b3c56c91831b3e8d909521b8f3ffbce4eb0395
Parents: 7c07d17
Author: Sameer Agarwal <sam...@databricks.com>
Authored: Thu Jun 2 10:58:00 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Thu Jun 2 10:58:00 2016 -0700

----------------------------------------------------------------------
 .../expressions/codegen/GenerateOrdering.scala        | 14 +++++++++++++-
 .../sql/catalyst/expressions/OrderingSuite.scala      | 14 ++++++++++----
 2 files changed, 23 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/09b3c56c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
index c10829d..f4d35d2 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
@@ -19,6 +19,9 @@ package org.apache.spark.sql.catalyst.expressions.codegen
 
 import java.io.ObjectInputStream
 
+import com.esotericsoftware.kryo.{Kryo, KryoSerializable}
+import com.esotericsoftware.kryo.io.{Input, Output}
+
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
@@ -147,7 +150,8 @@ object GenerateOrdering extends 
CodeGenerator[Seq[SortOrder], Ordering[InternalR
 /**
  * A lazily generated row ordering comparator.
  */
-class LazilyGeneratedOrdering(val ordering: Seq[SortOrder]) extends 
Ordering[InternalRow] {
+class LazilyGeneratedOrdering(val ordering: Seq[SortOrder])
+  extends Ordering[InternalRow] with KryoSerializable {
 
   def this(ordering: Seq[SortOrder], inputSchema: Seq[Attribute]) =
     this(ordering.map(BindReferences.bindReference(_, inputSchema)))
@@ -163,6 +167,14 @@ class LazilyGeneratedOrdering(val ordering: 
Seq[SortOrder]) extends Ordering[Int
     in.defaultReadObject()
     generatedOrdering = GenerateOrdering.generate(ordering)
   }
+
+  override def write(kryo: Kryo, out: Output): Unit = Utils.tryOrIOException {
+    kryo.writeObject(out, ordering.toArray)
+  }
+
+  override def read(kryo: Kryo, in: Input): Unit = Utils.tryOrIOException {
+    generatedOrdering = GenerateOrdering.generate(kryo.readObject(in, 
classOf[Array[SortOrder]]))
+  }
 }
 
 object LazilyGeneratedOrdering {

http://git-wip-us.apache.org/repos/asf/spark/blob/09b3c56c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala
index b190d3a..8cc2ab4 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala
@@ -19,11 +19,12 @@ package org.apache.spark.sql.catalyst.expressions
 
 import scala.math._
 
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.serializer.KryoSerializer
 import org.apache.spark.sql.{RandomDataGenerator, Row}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering
+import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateOrdering, 
LazilyGeneratedOrdering}
 import org.apache.spark.sql.types._
 
 class OrderingSuite extends SparkFunSuite with ExpressionEvalHelper {
@@ -44,9 +45,14 @@ class OrderingSuite extends SparkFunSuite with 
ExpressionEvalHelper {
           case Ascending => signum(expected)
           case Descending => -1 * signum(expected)
         }
+
+        val kryo = new KryoSerializer(new SparkConf).newInstance()
         val intOrdering = new InterpretedOrdering(sortOrder :: Nil)
-        val genOrdering = GenerateOrdering.generate(sortOrder :: Nil)
-        Seq(intOrdering, genOrdering).foreach { ordering =>
+        val genOrdering = new LazilyGeneratedOrdering(sortOrder :: Nil)
+        val kryoIntOrdering = 
kryo.deserialize[InterpretedOrdering](kryo.serialize(intOrdering))
+        val kryoGenOrdering = 
kryo.deserialize[LazilyGeneratedOrdering](kryo.serialize(genOrdering))
+
+        Seq(intOrdering, genOrdering, kryoIntOrdering, 
kryoGenOrdering).foreach { ordering =>
           assert(ordering.compare(rowA, rowA) === 0)
           assert(ordering.compare(rowB, rowB) === 0)
           assert(signum(ordering.compare(rowA, rowB)) === 
expectedCompareResult)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to