spark git commit: Revert "[SPARK-11271][SPARK-11016][CORE] Use Spark BitSet instead of RoaringBitmap to reduce memory usage"

2015-11-16 Thread davies
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 90d71bff0 -> 64439f7d6


Revert "[SPARK-11271][SPARK-11016][CORE] Use Spark BitSet instead of 
RoaringBitmap to reduce memory usage"

This reverts commit e209fa271ae57dc8849f8b1241bf1ea7d6d3d62c.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/64439f7d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/64439f7d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/64439f7d

Branch: refs/heads/branch-1.6
Commit: 64439f7d61edfbaa5513ddcf3f9dc86fa905aef2
Parents: 90d71bf
Author: Davies Liu 
Authored: Mon Nov 16 14:50:38 2015 -0800
Committer: Davies Liu 
Committed: Mon Nov 16 14:51:25 2015 -0800

--
 core/pom.xml|  4 ++
 .../org/apache/spark/scheduler/MapStatus.scala  | 13 +++---
 .../spark/serializer/KryoSerializer.scala   | 10 +++-
 .../apache/spark/util/collection/BitSet.scala   | 28 ++-
 .../spark/serializer/KryoSerializerSuite.scala  |  6 +++
 .../spark/util/collection/BitSetSuite.scala | 49 
 pom.xml |  5 ++
 7 files changed, 33 insertions(+), 82 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/64439f7d/core/pom.xml
--
diff --git a/core/pom.xml b/core/pom.xml
index 7e1205a..37e3f16 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -178,6 +178,10 @@
   lz4
 
 
+  org.roaringbitmap
+  RoaringBitmap
+
+
   commons-net
   commons-net
 

http://git-wip-us.apache.org/repos/asf/spark/blob/64439f7d/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
--
diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
index 180c8d1..1efce12 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
@@ -19,8 +19,9 @@ package org.apache.spark.scheduler
 
 import java.io.{Externalizable, ObjectInput, ObjectOutput}
 
+import org.roaringbitmap.RoaringBitmap
+
 import org.apache.spark.storage.BlockManagerId
-import org.apache.spark.util.collection.BitSet
 import org.apache.spark.util.Utils
 
 /**
@@ -132,7 +133,7 @@ private[spark] class CompressedMapStatus(
 private[spark] class HighlyCompressedMapStatus private (
 private[this] var loc: BlockManagerId,
 private[this] var numNonEmptyBlocks: Int,
-private[this] var emptyBlocks: BitSet,
+private[this] var emptyBlocks: RoaringBitmap,
 private[this] var avgSize: Long)
   extends MapStatus with Externalizable {
 
@@ -145,7 +146,7 @@ private[spark] class HighlyCompressedMapStatus private (
   override def location: BlockManagerId = loc
 
   override def getSizeForBlock(reduceId: Int): Long = {
-if (emptyBlocks.get(reduceId)) {
+if (emptyBlocks.contains(reduceId)) {
   0
 } else {
   avgSize
@@ -160,7 +161,7 @@ private[spark] class HighlyCompressedMapStatus private (
 
   override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
 loc = BlockManagerId(in)
-emptyBlocks = new BitSet
+emptyBlocks = new RoaringBitmap()
 emptyBlocks.readExternal(in)
 avgSize = in.readLong()
   }
@@ -176,15 +177,15 @@ private[spark] object HighlyCompressedMapStatus {
 // From a compression standpoint, it shouldn't matter whether we track 
empty or non-empty
 // blocks. From a performance standpoint, we benefit from tracking empty 
blocks because
 // we expect that there will be far fewer of them, so we will perform 
fewer bitmap insertions.
+val emptyBlocks = new RoaringBitmap()
 val totalNumBlocks = uncompressedSizes.length
-val emptyBlocks = new BitSet(totalNumBlocks)
 while (i < totalNumBlocks) {
   var size = uncompressedSizes(i)
   if (size > 0) {
 numNonEmptyBlocks += 1
 totalSize += size
   } else {
-emptyBlocks.set(i)
+emptyBlocks.add(i)
   }
   i += 1
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/64439f7d/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala 
b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index bc51d4f..c5195c1 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -30,6 +30,7 @@ import com.esotericsoftware.kryo.io.{Input => KryoInput, 
Output => KryoOutput}
 import com.esote

spark git commit: Revert "[SPARK-11271][SPARK-11016][CORE] Use Spark BitSet instead of RoaringBitmap to reduce memory usage"

2015-11-16 Thread davies
Repository: spark
Updated Branches:
  refs/heads/master 985b38dd2 -> 3c025087b


Revert "[SPARK-11271][SPARK-11016][CORE] Use Spark BitSet instead of 
RoaringBitmap to reduce memory usage"

This reverts commit e209fa271ae57dc8849f8b1241bf1ea7d6d3d62c.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3c025087
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3c025087
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3c025087

Branch: refs/heads/master
Commit: 3c025087b58f475a9bcb5c8f4b2b2df804915b2b
Parents: 985b38d
Author: Davies Liu 
Authored: Mon Nov 16 14:50:38 2015 -0800
Committer: Davies Liu 
Committed: Mon Nov 16 14:50:38 2015 -0800

--
 core/pom.xml|  4 ++
 .../org/apache/spark/scheduler/MapStatus.scala  | 13 +++---
 .../spark/serializer/KryoSerializer.scala   | 10 +++-
 .../apache/spark/util/collection/BitSet.scala   | 28 ++-
 .../spark/serializer/KryoSerializerSuite.scala  |  6 +++
 .../spark/util/collection/BitSetSuite.scala | 49 
 pom.xml |  5 ++
 7 files changed, 33 insertions(+), 82 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3c025087/core/pom.xml
--
diff --git a/core/pom.xml b/core/pom.xml
index 7e1205a..37e3f16 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -178,6 +178,10 @@
   lz4
 
 
+  org.roaringbitmap
+  RoaringBitmap
+
+
   commons-net
   commons-net
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3c025087/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
--
diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
index 180c8d1..1efce12 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
@@ -19,8 +19,9 @@ package org.apache.spark.scheduler
 
 import java.io.{Externalizable, ObjectInput, ObjectOutput}
 
+import org.roaringbitmap.RoaringBitmap
+
 import org.apache.spark.storage.BlockManagerId
-import org.apache.spark.util.collection.BitSet
 import org.apache.spark.util.Utils
 
 /**
@@ -132,7 +133,7 @@ private[spark] class CompressedMapStatus(
 private[spark] class HighlyCompressedMapStatus private (
 private[this] var loc: BlockManagerId,
 private[this] var numNonEmptyBlocks: Int,
-private[this] var emptyBlocks: BitSet,
+private[this] var emptyBlocks: RoaringBitmap,
 private[this] var avgSize: Long)
   extends MapStatus with Externalizable {
 
@@ -145,7 +146,7 @@ private[spark] class HighlyCompressedMapStatus private (
   override def location: BlockManagerId = loc
 
   override def getSizeForBlock(reduceId: Int): Long = {
-if (emptyBlocks.get(reduceId)) {
+if (emptyBlocks.contains(reduceId)) {
   0
 } else {
   avgSize
@@ -160,7 +161,7 @@ private[spark] class HighlyCompressedMapStatus private (
 
   override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
 loc = BlockManagerId(in)
-emptyBlocks = new BitSet
+emptyBlocks = new RoaringBitmap()
 emptyBlocks.readExternal(in)
 avgSize = in.readLong()
   }
@@ -176,15 +177,15 @@ private[spark] object HighlyCompressedMapStatus {
 // From a compression standpoint, it shouldn't matter whether we track 
empty or non-empty
 // blocks. From a performance standpoint, we benefit from tracking empty 
blocks because
 // we expect that there will be far fewer of them, so we will perform 
fewer bitmap insertions.
+val emptyBlocks = new RoaringBitmap()
 val totalNumBlocks = uncompressedSizes.length
-val emptyBlocks = new BitSet(totalNumBlocks)
 while (i < totalNumBlocks) {
   var size = uncompressedSizes(i)
   if (size > 0) {
 numNonEmptyBlocks += 1
 totalSize += size
   } else {
-emptyBlocks.set(i)
+emptyBlocks.add(i)
   }
   i += 1
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3c025087/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala 
b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index bc51d4f..c5195c1 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -30,6 +30,7 @@ import com.esotericsoftware.kryo.io.{Input => KryoInput, 
Output => KryoOutput}
 import com.esotericsoftw