Repository: spark
Updated Branches:
  refs/heads/branch-1.6 90d71bff0 -> 64439f7d6


Revert "[SPARK-11271][SPARK-11016][CORE] Use Spark BitSet instead of 
RoaringBitmap to reduce memory usage"

This reverts commit e209fa271ae57dc8849f8b1241bf1ea7d6d3d62c.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/64439f7d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/64439f7d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/64439f7d

Branch: refs/heads/branch-1.6
Commit: 64439f7d61edfbaa5513ddcf3f9dc86fa905aef2
Parents: 90d71bf
Author: Davies Liu <davies....@gmail.com>
Authored: Mon Nov 16 14:50:38 2015 -0800
Committer: Davies Liu <davies....@gmail.com>
Committed: Mon Nov 16 14:51:25 2015 -0800

----------------------------------------------------------------------
 core/pom.xml                                    |  4 ++
 .../org/apache/spark/scheduler/MapStatus.scala  | 13 +++---
 .../spark/serializer/KryoSerializer.scala       | 10 +++-
 .../apache/spark/util/collection/BitSet.scala   | 28 ++---------
 .../spark/serializer/KryoSerializerSuite.scala  |  6 +++
 .../spark/util/collection/BitSetSuite.scala     | 49 --------------------
 pom.xml                                         |  5 ++
 7 files changed, 33 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/64439f7d/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index 7e1205a..37e3f16 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -178,6 +178,10 @@
       <artifactId>lz4</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.roaringbitmap</groupId>
+      <artifactId>RoaringBitmap</artifactId>
+    </dependency>
+    <dependency>
       <groupId>commons-net</groupId>
       <artifactId>commons-net</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/spark/blob/64439f7d/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
index 180c8d1..1efce12 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
@@ -19,8 +19,9 @@ package org.apache.spark.scheduler
 
 import java.io.{Externalizable, ObjectInput, ObjectOutput}
 
+import org.roaringbitmap.RoaringBitmap
+
 import org.apache.spark.storage.BlockManagerId
-import org.apache.spark.util.collection.BitSet
 import org.apache.spark.util.Utils
 
 /**
@@ -132,7 +133,7 @@ private[spark] class CompressedMapStatus(
 private[spark] class HighlyCompressedMapStatus private (
     private[this] var loc: BlockManagerId,
     private[this] var numNonEmptyBlocks: Int,
-    private[this] var emptyBlocks: BitSet,
+    private[this] var emptyBlocks: RoaringBitmap,
     private[this] var avgSize: Long)
   extends MapStatus with Externalizable {
 
@@ -145,7 +146,7 @@ private[spark] class HighlyCompressedMapStatus private (
   override def location: BlockManagerId = loc
 
   override def getSizeForBlock(reduceId: Int): Long = {
-    if (emptyBlocks.get(reduceId)) {
+    if (emptyBlocks.contains(reduceId)) {
       0
     } else {
       avgSize
@@ -160,7 +161,7 @@ private[spark] class HighlyCompressedMapStatus private (
 
   override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
     loc = BlockManagerId(in)
-    emptyBlocks = new BitSet
+    emptyBlocks = new RoaringBitmap()
     emptyBlocks.readExternal(in)
     avgSize = in.readLong()
   }
@@ -176,15 +177,15 @@ private[spark] object HighlyCompressedMapStatus {
     // From a compression standpoint, it shouldn't matter whether we track 
empty or non-empty
     // blocks. From a performance standpoint, we benefit from tracking empty 
blocks because
     // we expect that there will be far fewer of them, so we will perform 
fewer bitmap insertions.
+    val emptyBlocks = new RoaringBitmap()
     val totalNumBlocks = uncompressedSizes.length
-    val emptyBlocks = new BitSet(totalNumBlocks)
     while (i < totalNumBlocks) {
       var size = uncompressedSizes(i)
       if (size > 0) {
         numNonEmptyBlocks += 1
         totalSize += size
       } else {
-        emptyBlocks.set(i)
+        emptyBlocks.add(i)
       }
       i += 1
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/64439f7d/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala 
b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index bc51d4f..c5195c1 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -30,6 +30,7 @@ import com.esotericsoftware.kryo.io.{Input => KryoInput, 
Output => KryoOutput}
 import com.esotericsoftware.kryo.serializers.{JavaSerializer => 
KryoJavaSerializer}
 import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator}
 import org.apache.avro.generic.{GenericData, GenericRecord}
+import org.roaringbitmap.{ArrayContainer, BitmapContainer, RoaringArray, 
RoaringBitmap}
 
 import org.apache.spark._
 import org.apache.spark.api.python.PythonBroadcast
@@ -38,7 +39,7 @@ import org.apache.spark.network.util.ByteUnit
 import org.apache.spark.scheduler.{CompressedMapStatus, 
HighlyCompressedMapStatus}
 import org.apache.spark.storage._
 import org.apache.spark.util.{Utils, BoundedPriorityQueue, 
SerializableConfiguration, SerializableJobConf}
-import org.apache.spark.util.collection.{BitSet, CompactBuffer}
+import org.apache.spark.util.collection.CompactBuffer
 
 /**
  * A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo 
serialization library]].
@@ -362,7 +363,12 @@ private[serializer] object KryoSerializer {
     classOf[StorageLevel],
     classOf[CompressedMapStatus],
     classOf[HighlyCompressedMapStatus],
-    classOf[BitSet],
+    classOf[RoaringBitmap],
+    classOf[RoaringArray],
+    classOf[RoaringArray.Element],
+    classOf[Array[RoaringArray.Element]],
+    classOf[ArrayContainer],
+    classOf[BitmapContainer],
     classOf[CompactBuffer[_]],
     classOf[BlockManagerId],
     classOf[Array[Byte]],

http://git-wip-us.apache.org/repos/asf/spark/blob/64439f7d/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala 
b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
index 85c5bdb..7ab67fc 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
@@ -17,21 +17,14 @@
 
 package org.apache.spark.util.collection
 
-import java.io.{Externalizable, ObjectInput, ObjectOutput}
-
-import org.apache.spark.util.{Utils => UUtils}
-
-
 /**
  * A simple, fixed-size bit set implementation. This implementation is fast 
because it avoids
  * safety/bound checking.
  */
-class BitSet(private[this] var numBits: Int) extends Externalizable {
+class BitSet(numBits: Int) extends Serializable {
 
-  private var words = new Array[Long](bit2words(numBits))
-  private def numWords = words.length
-
-  def this() = this(0)
+  private val words = new Array[Long](bit2words(numBits))
+  private val numWords = words.length
 
   /**
    * Compute the capacity (number of bits) that can be represented
@@ -237,19 +230,4 @@ class BitSet(private[this] var numBits: Int) extends 
Externalizable {
 
   /** Return the number of longs it would take to hold numBits. */
   private def bit2words(numBits: Int) = ((numBits - 1) >> 6) + 1
-
-  override def writeExternal(out: ObjectOutput): Unit = 
UUtils.tryOrIOException {
-    out.writeInt(numBits)
-    words.foreach(out.writeLong(_))
-  }
-
-  override def readExternal(in: ObjectInput): Unit = UUtils.tryOrIOException {
-    numBits = in.readInt()
-    words = new Array[Long](bit2words(numBits))
-    var index = 0
-    while (index < words.length) {
-      words(index) = in.readLong()
-      index += 1
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/64439f7d/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala 
b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index afe2e80..e428414 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -322,6 +322,12 @@ class KryoSerializerSuite extends SparkFunSuite with 
SharedSparkContext {
     val conf = new SparkConf(false)
     conf.set("spark.kryo.registrationRequired", "true")
 
+    // these cases require knowing the internals of RoaringBitmap a little.  
Blocks span 2^16
+    // values, and they use a bitmap (dense) if they have more than 4096 
values, and an
+    // array (sparse) if they use less.  So we just create two cases, one 
sparse and one dense.
+    // and we use a roaring bitmap for the empty blocks, so we trigger the 
dense case w/ mostly
+    // empty blocks
+
     val ser = new KryoSerializer(conf).newInstance()
     val denseBlockSizes = new Array[Long](5000)
     val sparseBlockSizes = Array[Long](0L, 1L, 0L, 2L)

http://git-wip-us.apache.org/repos/asf/spark/blob/64439f7d/core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala 
b/core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala
index b0db098..69dbfa9 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala
@@ -17,10 +17,7 @@
 
 package org.apache.spark.util.collection
 
-import java.io.{File, FileInputStream, FileOutputStream, ObjectInputStream, 
ObjectOutputStream}
-
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.util.{Utils => UUtils}
 
 class BitSetSuite extends SparkFunSuite {
 
@@ -155,50 +152,4 @@ class BitSetSuite extends SparkFunSuite {
     assert(bitsetDiff.nextSetBit(85) === 85)
     assert(bitsetDiff.nextSetBit(86) === -1)
   }
-
-  test("read and write externally") {
-    val tempDir = UUtils.createTempDir()
-    val outputFile = File.createTempFile("bits", null, tempDir)
-
-    val fos = new FileOutputStream(outputFile)
-    val oos = new ObjectOutputStream(fos)
-
-    // Create BitSet
-    val setBits = Seq(0, 9, 1, 10, 90, 96)
-    val bitset = new BitSet(100)
-
-    for (i <- 0 until 100) {
-      assert(!bitset.get(i))
-    }
-
-    setBits.foreach(i => bitset.set(i))
-
-    for (i <- 0 until 100) {
-      if (setBits.contains(i)) {
-        assert(bitset.get(i))
-      } else {
-        assert(!bitset.get(i))
-      }
-    }
-    assert(bitset.cardinality() === setBits.size)
-
-    bitset.writeExternal(oos)
-    oos.close()
-
-    val fis = new FileInputStream(outputFile)
-    val ois = new ObjectInputStream(fis)
-
-    // Read BitSet from the file
-    val bitset2 = new BitSet(0)
-    bitset2.readExternal(ois)
-
-    for (i <- 0 until 100) {
-      if (setBits.contains(i)) {
-        assert(bitset2.get(i))
-      } else {
-        assert(!bitset2.get(i))
-      }
-    }
-    assert(bitset2.cardinality() === setBits.size)
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/64439f7d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 01afa80..2a8a445 100644
--- a/pom.xml
+++ b/pom.xml
@@ -635,6 +635,11 @@
         </exclusions>
       </dependency>
       <dependency>
+        <groupId>org.roaringbitmap</groupId>
+        <artifactId>RoaringBitmap</artifactId>
+        <version>0.4.5</version>
+      </dependency>
+      <dependency>
         <groupId>commons-net</groupId>
         <artifactId>commons-net</artifactId>
         <version>2.2</version>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to