[
https://issues.apache.org/jira/browse/SPARK-11583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14999009#comment-14999009
]
Imran Rashid commented on SPARK-11583:
--------------------------------------
I think [~srowen] is right that we are going in circles a bit here, we don't
just want to re-implement our own RoaringBitmap ... I have a feeling we
actually made a mis-step in https://issues.apache.org/jira/browse/SPARK-11271.
I think there was some confusion about the way these structures work, as well
as some misleading comments. After a bit of digging, here is my understanding,
but please correct me (would especially appreciate feedback from [~lemire])
1. Given a relatively full set, {{RoaringBitmap}} is not going to do better
than a {{BitSet}}. In fact, it will most likely use a bit more space, because
its trying to do some extra book-keeping beyond a normal {{BitSet}}. >From
https://issues.apache.org/jira/browse/SPARK-11271, it appears in one case this
is 20%. However, we don't really know whether that is the worst case,
relatively typical, or maybe it could be far worse in other cases. This might
require a more thorough investigation from someone.
2. For sparse sets, a {{RoaringBitmap}} will use much less space than a
{{BitSet}}, _including deserialized_. That is, the old comment in MapStatus,
["During serialization, this bitmap is compressed" |
https://github.com/apache/spark/blob/6e823b4d7d52e9cf707f144256006e4575d23dc2/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala#L125]
is very misleading -- the bitmap is always compressed. The original
implementation of {{HighlyCompressedMapStatus}} was just mostly concerned with
decreasing the size of the network traffic, but its also compressed in memory.
3. When a set is nearly full, {{RoaringBitmap}} does *not* automatically invert
the bits in order to minimize space. Here's an example:
{noformat}
scala> import org.roaringbitmap._
import org.roaringbitmap._
scala> val rr = RoaringBitmap.bitmapOf(1,2,3,1000)
rr: org.roaringbitmap.RoaringBitmap = {1,2,3,1000}
scala> val x = rr.clone()
x: org.roaringbitmap.RoaringBitmap = {1,2,3,1000}
scala> x.flip(0,1001)
scala> rr.getSizeInBytes()
res1: Int = 22
scala> x.getSizeInBytes()
res2: Int = 2008
{noformat}
There is another comment in the old code: ["From a compression standpoint, it
shouldn't matter whether we track empty or non-empty blocks" |
https://github.com/apache/spark/blob/6e823b4d7d52e9cf707f144256006e4575d23dc2/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala#L177].
This was also a bit misleading. That comment was also probably referring to
when the data was serialized, and sent over the network, with another layer of
compression enabled, in which case the versions would be the same. But the *in
memory* usage can actually be very different.
So I'm pretty sure this means that after
https://issues.apache.org/jira/browse/SPARK-11271, we are actually using much
*more* memory when those sets are relatively empty -- that is, when most blocks
are *non*-empty. We did at least reduce the worst-case memory usage that comes
with a nearly full set (when most blocks are *empty*).
The only thing Spark needs from this bitset is:
1) small size in-memory
2) small size serialized
3) fast {{contains()}}
{{RoaringBitmap}} is optimized for some other use cases, eg. fast intersection
& modification. Note that spark doesn't even need mutability for the bitsets
in {{MapStatus}} -- after they are created, they are never changed.
Nonetheless, {{RoaringBitmap}} might still be a good fit because it does
compression.
I think our options are:
1. Roll our own compressed bit set -- basically what is in the current PR
2. Go back to RoaringBitmap, but choose whether to store the empty or non-empty
blocks based on what will use the least memory.
3. Look for some other pre-existing bitset implementation which is closer to
our needs.
I'm leaning towards (1), and moving forward with the PR, but I thought it was
worth clarifying the situation and making sure we understood what was going on.
> Make MapStatus use less memory uage
> -----------------------------------
>
> Key: SPARK-11583
> URL: https://issues.apache.org/jira/browse/SPARK-11583
> Project: Spark
> Issue Type: Improvement
> Components: Scheduler, Spark Core
> Reporter: Kent Yao
>
> In the resolved issue https://issues.apache.org/jira/browse/SPARK-11271, as I
> said, using BitSet can save ≈20% memory usage compared to RoaringBitMap.
> For a spark job contains quite a lot of tasks, 20% seems a drop in the ocean.
> Essentially, BitSet uses long[]. For example a BitSet[200k] = long[3125].
> So if we use a HashSet[Int] to store reduceId (when non-empty blocks are
> dense,use reduceId of empty blocks; when sparse, use non-empty ones).
> For dense cases: if HashSet[Int](numNonEmptyBlocks).size <
> BitSet[totalBlockNum], I use MapStatusTrackingNoEmptyBlocks
> For sparse cases: if HashSet[Int](numEmptyBlocks).size <
> BitSet[totalBlockNum], I use MapStatusTrackingEmptyBlocks
> sparse case, 299/300 are empty
> sc.makeRDD(1 to 30000, 3000).groupBy(x=>x).top(5)
> dense case, no block is empty
> sc.makeRDD(1 to 9000000, 3000).groupBy(x=>x).top(5)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]