Repository: spark
Updated Branches:
  refs/heads/branch-1.3 0ba759985 -> 825499655


[SPARK-6465][SQL] Fix serialization of GenericRowWithSchema using kryo

Author: Michael Armbrust <mich...@databricks.com>

Closes #5191 from marmbrus/kryoRowsWithSchema and squashes the following 
commits:

bb83522 [Michael Armbrust] Fix serialization of GenericRowWithSchema using kryo
f914f16 [Michael Armbrust] Add no arg constructor to GenericRowWithSchema

(cherry picked from commit f88f51bbd461e0a42ad7021147268509b9c3c56e)
Signed-off-by: Cheng Lian <l...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/82549965
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/82549965
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/82549965

Branch: refs/heads/branch-1.3
Commit: 8254996557512b8bbc8fd35c550004b56144581f
Parents: 0ba7599
Author: Michael Armbrust <mich...@databricks.com>
Authored: Thu Mar 26 18:46:57 2015 +0800
Committer: Cheng Lian <l...@databricks.com>
Committed: Thu Mar 26 18:47:15 2015 +0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/expressions/rows.scala     |  7 +++++--
 .../org/apache/spark/sql/types/Metadata.scala     |  3 +++
 .../org/apache/spark/sql/types/dataTypes.scala    | 18 ++++++++++++++++++
 .../spark/sql/execution/SparkSqlSerializer.scala  |  4 +---
 .../scala/org/apache/spark/sql/RowSuite.scala     | 12 ++++++++++++
 5 files changed, 39 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/82549965/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
index 8bba26b..a8983df 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
@@ -66,7 +66,7 @@ object EmptyRow extends Row {
  */
 class GenericRow(protected[sql] val values: Array[Any]) extends Row {
   /** No-arg constructor for serialization. */
-  def this() = this(null)
+  protected def this() = this(null)
 
   def this(size: Int) = this(new Array[Any](size))
 
@@ -172,11 +172,14 @@ class GenericRow(protected[sql] val values: Array[Any]) 
extends Row {
 
 class GenericRowWithSchema(values: Array[Any], override val schema: StructType)
   extends GenericRow(values) {
+
+  /** No-arg constructor for serialization. */
+  protected def this() = this(null, null)
 }
 
 class GenericMutableRow(v: Array[Any]) extends GenericRow(v) with MutableRow {
   /** No-arg constructor for serialization. */
-  def this() = this(null)
+  protected def this() = this(null)
 
   def this(size: Int) = this(new Array[Any](size))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/82549965/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala
index e50e976..6ee24ee 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala
@@ -41,6 +41,9 @@ import org.apache.spark.annotation.DeveloperApi
 sealed class Metadata private[types] (private[types] val map: Map[String, Any])
   extends Serializable {
 
+  /** No-arg constructor for kryo. */
+  protected def this() = this(null)
+
   /** Tests whether this Metadata contains a binding for a key. */
   def contains(key: String): Boolean = map.contains(key)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/82549965/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala
index d973144..952cf5c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala
@@ -670,6 +670,10 @@ case class PrecisionInfo(precision: Int, scale: Int)
  */
 @DeveloperApi
 case class DecimalType(precisionInfo: Option[PrecisionInfo]) extends 
FractionalType {
+
+  /** No-arg constructor for kryo. */
+  protected def this() = this(null)
+
   private[sql] type JvmType = Decimal
   @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { 
typeTag[JvmType] }
   private[sql] val numeric = Decimal.DecimalIsFractional
@@ -819,6 +823,10 @@ object ArrayType {
  */
 @DeveloperApi
 case class ArrayType(elementType: DataType, containsNull: Boolean) extends 
DataType {
+
+  /** No-arg constructor for kryo. */
+  protected def this() = this(null, false)
+
   private[sql] def buildFormattedString(prefix: String, builder: 
StringBuilder): Unit = {
     builder.append(
       s"$prefix-- element: ${elementType.typeName} (containsNull = 
$containsNull)\n")
@@ -857,6 +865,9 @@ case class StructField(
     nullable: Boolean = true,
     metadata: Metadata = Metadata.empty) {
 
+  /** No-arg constructor for kryo. */
+  protected def this() = this(null, null)
+
   private[sql] def buildFormattedString(prefix: String, builder: 
StringBuilder): Unit = {
     builder.append(s"$prefix-- $name: ${dataType.typeName} (nullable = 
$nullable)\n")
     DataType.buildFormattedString(dataType, s"$prefix    |", builder)
@@ -1003,6 +1014,9 @@ object StructType {
 @DeveloperApi
 case class StructType(fields: Array[StructField]) extends DataType with 
Seq[StructField] {
 
+  /** No-arg constructor for kryo. */
+  protected def this() = this(null)
+
   /** Returns all field names in an array. */
   def fieldNames: Array[String] = fields.map(_.name)
 
@@ -1121,6 +1135,10 @@ case class MapType(
     keyType: DataType,
     valueType: DataType,
     valueContainsNull: Boolean) extends DataType {
+
+  /** No-arg constructor for kryo. */
+  def this() = this(null, null, false)
+
   private[sql] def buildFormattedString(prefix: String, builder: 
StringBuilder): Unit = {
     builder.append(s"$prefix-- key: ${keyType.typeName}\n")
     builder.append(s"$prefix-- value: ${valueType.typeName} " +

http://git-wip-us.apache.org/repos/asf/spark/blob/82549965/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
index c4534fd..967bd76 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
@@ -39,7 +39,7 @@ import 
org.apache.spark.sql.catalyst.expressions.codegen.{IntegerHashSet, LongHa
 
 private[sql] class SparkSqlSerializer(conf: SparkConf) extends 
KryoSerializer(conf) {
   override def newKryo(): Kryo = {
-    val kryo = new Kryo()
+    val kryo = super.newKryo()
     kryo.setRegistrationRequired(false)
     kryo.register(classOf[MutablePair[_, _]])
     
kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericRow])
@@ -57,8 +57,6 @@ private[sql] class SparkSqlSerializer(conf: SparkConf) 
extends KryoSerializer(co
     kryo.register(classOf[Decimal])
 
     kryo.setReferences(false)
-    kryo.setClassLoader(Utils.getSparkClassLoader)
-    new AllScalaRegistrar().apply(kryo)
     kryo
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/82549965/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala
index f5b945f..36465cc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala
@@ -17,9 +17,12 @@
 
 package org.apache.spark.sql
 
+import org.apache.spark.sql.execution.SparkSqlSerializer
 import org.scalatest.FunSuite
 
 import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, 
SpecificMutableRow}
+import org.apache.spark.sql.test.TestSQLContext
+import org.apache.spark.sql.test.TestSQLContext.implicits._
 import org.apache.spark.sql.types._
 
 class RowSuite extends FunSuite {
@@ -50,4 +53,13 @@ class RowSuite extends FunSuite {
     row(0) = null
     assert(row.isNullAt(0))
   }
+
+  test("serialize w/ kryo") {
+    val row = Seq((1, Seq(1), Map(1 -> 1), BigDecimal(1))).toDF().first()
+    val serializer = new 
SparkSqlSerializer(TestSQLContext.sparkContext.getConf)
+    val instance = serializer.newInstance()
+    val ser = instance.serialize(row)
+    val de = instance.deserialize(ser).asInstanceOf[Row]
+    assert(de === row)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to