Repository: spark Updated Branches: refs/heads/master 7c07d176f -> 09b3c56c9
[SPARK-14752][SQL] Explicitly implement KryoSerialization for LazilyGenerateOrdering ## What changes were proposed in this pull request? This patch fixes a number of `com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException` exceptions reported in [SPARK-15604], [SPARK-14752] etc. (while executing sparkSQL queries with the kryo serializer) by explicitly implementing `KryoSerialization` for `LazilyGenerateOrdering`. ## How was this patch tested? 1. Modified `OrderingSuite` so that all tests in the suite also test kryo serialization (for both interpreted and generated ordering). 2. Manually verified TPC-DS q1. Author: Sameer Agarwal <sam...@databricks.com> Closes #13466 from sameeragarwal/kryo. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/09b3c56c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/09b3c56c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/09b3c56c Branch: refs/heads/master Commit: 09b3c56c91831b3e8d909521b8f3ffbce4eb0395 Parents: 7c07d17 Author: Sameer Agarwal <sam...@databricks.com> Authored: Thu Jun 2 10:58:00 2016 -0700 Committer: Reynold Xin <r...@databricks.com> Committed: Thu Jun 2 10:58:00 2016 -0700 ---------------------------------------------------------------------- .../expressions/codegen/GenerateOrdering.scala | 14 +++++++++++++- .../sql/catalyst/expressions/OrderingSuite.scala | 14 ++++++++++---- 2 files changed, 23 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/09b3c56c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala index c10829d..f4d35d2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala @@ -19,6 +19,9 @@ package org.apache.spark.sql.catalyst.expressions.codegen import java.io.ObjectInputStream +import com.esotericsoftware.kryo.{Kryo, KryoSerializable} +import com.esotericsoftware.kryo.io.{Input, Output} + import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -147,7 +150,8 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR /** * A lazily generated row ordering comparator. */ -class LazilyGeneratedOrdering(val ordering: Seq[SortOrder]) extends Ordering[InternalRow] { +class LazilyGeneratedOrdering(val ordering: Seq[SortOrder]) + extends Ordering[InternalRow] with KryoSerializable { def this(ordering: Seq[SortOrder], inputSchema: Seq[Attribute]) = this(ordering.map(BindReferences.bindReference(_, inputSchema))) @@ -163,6 +167,14 @@ class LazilyGeneratedOrdering(val ordering: Seq[SortOrder]) extends Ordering[Int in.defaultReadObject() generatedOrdering = GenerateOrdering.generate(ordering) } + + override def write(kryo: Kryo, out: Output): Unit = Utils.tryOrIOException { + kryo.writeObject(out, ordering.toArray) + } + + override def read(kryo: Kryo, in: Input): Unit = Utils.tryOrIOException { + generatedOrdering = GenerateOrdering.generate(kryo.readObject(in, classOf[Array[SortOrder]])) + } } object LazilyGeneratedOrdering { http://git-wip-us.apache.org/repos/asf/spark/blob/09b3c56c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala index b190d3a..8cc2ab4 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala @@ -19,11 +19,12 @@ package org.apache.spark.sql.catalyst.expressions import scala.math._ -import org.apache.spark.SparkFunSuite +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.serializer.KryoSerializer import org.apache.spark.sql.{RandomDataGenerator, Row} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.dsl.expressions._ -import org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering +import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateOrdering, LazilyGeneratedOrdering} import org.apache.spark.sql.types._ class OrderingSuite extends SparkFunSuite with ExpressionEvalHelper { @@ -44,9 +45,14 @@ class OrderingSuite extends SparkFunSuite with ExpressionEvalHelper { case Ascending => signum(expected) case Descending => -1 * signum(expected) } + + val kryo = new KryoSerializer(new SparkConf).newInstance() val intOrdering = new InterpretedOrdering(sortOrder :: Nil) - val genOrdering = GenerateOrdering.generate(sortOrder :: Nil) - Seq(intOrdering, genOrdering).foreach { ordering => + val genOrdering = new LazilyGeneratedOrdering(sortOrder :: Nil) + val kryoIntOrdering = kryo.deserialize[InterpretedOrdering](kryo.serialize(intOrdering)) + val kryoGenOrdering = kryo.deserialize[LazilyGeneratedOrdering](kryo.serialize(genOrdering)) + + Seq(intOrdering, genOrdering, kryoIntOrdering, kryoGenOrdering).foreach { ordering => assert(ordering.compare(rowA, rowA) === 0) assert(ordering.compare(rowB, rowB) === 0) assert(signum(ordering.compare(rowA, rowB)) === expectedCompareResult) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org