Repository: spark
Updated Branches:
  refs/heads/master 6c20f3529 -> 1f1fccc5c


[SPARK-5949] HighlyCompressedMapStatus needs more classes registered w/ kryo

https://issues.apache.org/jira/browse/SPARK-5949

Author: Imran Rashid <iras...@cloudera.com>

Closes #4877 from squito/SPARK-5949_register_roaring_bitmap and squashes the 
following commits:

7e13316 [Imran Rashid] style style style
5f6bb6d [Imran Rashid] more style
709bfe0 [Imran Rashid] style
a5cb744 [Imran Rashid] update tests to cover both types of 
RoaringBitmapContainers
09610c6 [Imran Rashid] formatting
f9a0b7c [Imran Rashid] put primitive array registrations together
97beaf8 [Imran Rashid] SPARK-5949 HighlyCompressedMapStatus needs more classes 
registered w/ kryo


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1f1fccc5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1f1fccc5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1f1fccc5

Branch: refs/heads/master
Commit: 1f1fccc5ceb0c5b7656a0594be3a67bd3b432e85
Parents: 6c20f35
Author: Imran Rashid <iras...@cloudera.com>
Authored: Tue Mar 3 15:33:19 2015 -0800
Committer: Reynold Xin <r...@databricks.com>
Committed: Tue Mar 3 15:33:19 2015 -0800

----------------------------------------------------------------------
 .../spark/serializer/KryoSerializer.scala       | 15 ++++++++++---
 .../spark/serializer/KryoSerializerSuite.scala  | 23 ++++++++++++++++++--
 2 files changed, 33 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1f1fccc5/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala 
b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 02158aa..9ce64d4 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -20,22 +20,23 @@ package org.apache.spark.serializer
 import java.io.{EOFException, InputStream, OutputStream}
 import java.nio.ByteBuffer
 
+import scala.reflect.ClassTag
+
 import com.esotericsoftware.kryo.{Kryo, KryoException}
 import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
 import com.esotericsoftware.kryo.serializers.{JavaSerializer => 
KryoJavaSerializer}
 import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator}
+import org.roaringbitmap.{ArrayContainer, BitmapContainer, RoaringArray, 
RoaringBitmap}
 
 import org.apache.spark._
 import org.apache.spark.api.python.PythonBroadcast
 import org.apache.spark.broadcast.HttpBroadcast
-import org.apache.spark.network.nio.{PutBlock, GotBlock, GetBlock}
+import org.apache.spark.network.nio.{GetBlock, GotBlock, PutBlock}
 import org.apache.spark.scheduler.{CompressedMapStatus, 
HighlyCompressedMapStatus}
 import org.apache.spark.storage._
 import org.apache.spark.util.BoundedPriorityQueue
 import org.apache.spark.util.collection.CompactBuffer
 
-import scala.reflect.ClassTag
-
 /**
  * A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo 
serialization library]].
  *
@@ -202,9 +203,17 @@ private[serializer] object KryoSerializer {
     classOf[GetBlock],
     classOf[CompressedMapStatus],
     classOf[HighlyCompressedMapStatus],
+    classOf[RoaringBitmap],
+    classOf[RoaringArray],
+    classOf[RoaringArray.Element],
+    classOf[Array[RoaringArray.Element]],
+    classOf[ArrayContainer],
+    classOf[BitmapContainer],
     classOf[CompactBuffer[_]],
     classOf[BlockManagerId],
     classOf[Array[Byte]],
+    classOf[Array[Short]],
+    classOf[Array[Long]],
     classOf[BoundedPriorityQueue[_]],
     classOf[SparkConf]
   )

http://git-wip-us.apache.org/repos/asf/spark/blob/1f1fccc5/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala 
b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index a70f67a..523d898 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -23,9 +23,10 @@ import scala.reflect.ClassTag
 import com.esotericsoftware.kryo.Kryo
 import org.scalatest.FunSuite
 
-import org.apache.spark.{SparkConf, SharedSparkContext}
+import org.apache.spark.{SharedSparkContext, SparkConf}
+import org.apache.spark.scheduler.HighlyCompressedMapStatus
 import org.apache.spark.serializer.KryoTest._
-
+import org.apache.spark.storage.BlockManagerId
 
 class KryoSerializerSuite extends FunSuite with SharedSparkContext {
   conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
@@ -242,6 +243,24 @@ class KryoSerializerSuite extends FunSuite with 
SharedSparkContext {
       ser.newInstance().deserialize[ClassLoaderTestingObject](bytes)
     }
   }
+
+  test("registration of HighlyCompressedMapStatus") {
+    val conf = new SparkConf(false)
+    conf.set("spark.kryo.registrationRequired", "true")
+
+    // these cases require knowing the internals of RoaringBitmap a little.  
Blocks span 2^16
+    // values, and they use a bitmap (dense) if they have more than 4096 
values, and an
+    // array (sparse) if they use less.  So we just create two cases, one 
sparse and one dense.
+    // and we use a roaring bitmap for the empty blocks, so we trigger the 
dense case w/ mostly
+    // empty blocks
+
+    val ser = new KryoSerializer(conf).newInstance()
+    val denseBlockSizes = new Array[Long](5000)
+    val sparseBlockSizes = Array[Long](0L, 1L, 0L, 2L)
+    Seq(denseBlockSizes, sparseBlockSizes).foreach { blockSizes =>
+      ser.serialize(HighlyCompressedMapStatus(BlockManagerId("exec-1", "host", 
1234), blockSizes))
+    }
+  }
 }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to