[jira] [Commented] (SPARK-11583) Make MapStatus use less memory uage
[ https://issues.apache.org/jira/browse/SPARK-11583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15007600#comment-15007600 ] Apache Spark commented on SPARK-11583: -- User 'davies' has created a pull request for this issue: https://github.com/apache/spark/pull/9746 > Make MapStatus use less memory uage > --- > > Key: SPARK-11583 > URL: https://issues.apache.org/jira/browse/SPARK-11583 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core >Reporter: Kent Yao > > In the resolved issue https://issues.apache.org/jira/browse/SPARK-11271, as I > said, using BitSet can save ≈20% memory usage compared to RoaringBitMap. > For a spark job contains quite a lot of tasks, 20% seems a drop in the ocean. > Essentially, BitSet uses long[]. For example a BitSet[200k] = long[3125]. > So if we use a HashSet[Int] to store reduceId (when non-empty blocks are > dense,use reduceId of empty blocks; when sparse, use non-empty ones). > For dense cases: if HashSet[Int](numNonEmptyBlocks).size < > BitSet[totalBlockNum], I use MapStatusTrackingNoEmptyBlocks > For sparse cases: if HashSet[Int](numEmptyBlocks).size < > BitSet[totalBlockNum], I use MapStatusTrackingEmptyBlocks > sparse case, 299/300 are empty > sc.makeRDD(1 to 3, 3000).groupBy(x=>x).top(5) > dense case, no block is empty > sc.makeRDD(1 to 900, 3000).groupBy(x=>x).top(5) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11583) Make MapStatus use less memory uage
[ https://issues.apache.org/jira/browse/SPARK-11583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15004924#comment-15004924 ] Reynold Xin commented on SPARK-11583: - I will get somebody to take a look at this and recommend a solution for 1.6. One possibility is just to revert the old patch. > Make MapStatus use less memory uage > --- > > Key: SPARK-11583 > URL: https://issues.apache.org/jira/browse/SPARK-11583 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core >Reporter: Kent Yao > > In the resolved issue https://issues.apache.org/jira/browse/SPARK-11271, as I > said, using BitSet can save ≈20% memory usage compared to RoaringBitMap. > For a spark job contains quite a lot of tasks, 20% seems a drop in the ocean. > Essentially, BitSet uses long[]. For example a BitSet[200k] = long[3125]. > So if we use a HashSet[Int] to store reduceId (when non-empty blocks are > dense,use reduceId of empty blocks; when sparse, use non-empty ones). > For dense cases: if HashSet[Int](numNonEmptyBlocks).size < > BitSet[totalBlockNum], I use MapStatusTrackingNoEmptyBlocks > For sparse cases: if HashSet[Int](numEmptyBlocks).size < > BitSet[totalBlockNum], I use MapStatusTrackingEmptyBlocks > sparse case, 299/300 are empty > sc.makeRDD(1 to 3, 3000).groupBy(x=>x).top(5) > dense case, no block is empty > sc.makeRDD(1 to 900, 3000).groupBy(x=>x).top(5) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11583) Make MapStatus use less memory uage
[ https://issues.apache.org/jira/browse/SPARK-11583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15003185#comment-15003185 ] Imran Rashid commented on SPARK-11583: -- [~lemire] after reading a couple of comments in the old prs, I realized that I actually can say a bit more about how we are using this. (1) we are only using these bitsets when there are more than 2k elements. The storage used with less than 2k elements is irrelevant (for now, in any case). (2) When there are more than 2k elements in one of these bitsets, we actually expect there to be a lot of bitsets with the same number of elements. I'd expect that if one bitset has n elements, than I'd expect there to be n different bit sets (as [~Qin Yao] did in the tests above). This isn't a hard and fast rule. You could have only 1 copy. You also could have many more than n copies. Eg., you might have 100k bitsets even if each bitset has only 2k elements. I'd actually say that would probably be the more common case. So small savings in one bitset can get multiplied a lot (3) Though 2k is the hard lower bound, there is no hard upper-bound on the maximum number of elements in theory. But lets say that we want it to work for up to 100k elements now. (4) We don't have hard-data on this, but I suspect that as the size goes up, so will the sparsity. Eg., I'd expect more empty blocks when there are 100k blocks vs. when there are 2k blocks. But still no guarantees. (5) This is very hand-wavy, but IMO, I think that having a small number of empty-blocks will be the far more common case. If we can save a lot of memory in that case, and have a *small* (but well quantified) penalty in the worst case, its probably worth doing. Also I'd like to get [~srowen] and [~rxin] opinion on possibly making this a blocker for 1.6? With the current code in the 1.6 branch, we have massively improved the worst-case memory usage, but made memory usage 2x for a small number of empty blocks vs. 1.5 > Make MapStatus use less memory uage > --- > > Key: SPARK-11583 > URL: https://issues.apache.org/jira/browse/SPARK-11583 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core >Reporter: Kent Yao > > In the resolved issue https://issues.apache.org/jira/browse/SPARK-11271, as I > said, using BitSet can save ≈20% memory usage compared to RoaringBitMap. > For a spark job contains quite a lot of tasks, 20% seems a drop in the ocean. > Essentially, BitSet uses long[]. For example a BitSet[200k] = long[3125]. > So if we use a HashSet[Int] to store reduceId (when non-empty blocks are > dense,use reduceId of empty blocks; when sparse, use non-empty ones). > For dense cases: if HashSet[Int](numNonEmptyBlocks).size < > BitSet[totalBlockNum], I use MapStatusTrackingNoEmptyBlocks > For sparse cases: if HashSet[Int](numEmptyBlocks).size < > BitSet[totalBlockNum], I use MapStatusTrackingEmptyBlocks > sparse case, 299/300 are empty > sc.makeRDD(1 to 3, 3000).groupBy(x=>x).top(5) > dense case, no block is empty > sc.makeRDD(1 to 900, 3000).groupBy(x=>x).top(5) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11583) Make MapStatus use less memory uage
[ https://issues.apache.org/jira/browse/SPARK-11583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15003084#comment-15003084 ] Imran Rashid commented on SPARK-11583: -- [~lemire] I thought Kent Yao's analysis was pretty reasonable (with a few additional suggestions I'll make below). But it sounds like that isn't what you were expecting -- can you suggest what else you have in mind? I think it would be better to do the same thing directly on the bitsets, without the additional complexity from spark in the middle, but in general that is what I was looking for. The truth of the matter is that as Spark is meant to be very general purpose, we really can't say anything specific about what the data in these bitsets will be. They might be very full; they might be very dense; there isn't any particular reason to suspect that they'd be organized into runs (more than you'd expect by random chance). I understand that one format won't be better in all cases, but what I'm looking for is to to find the format which works well for a wide range of loads, *and* that doesn't contain too big a penalty in the worst case. Eg., if the most memory-intensive cases use 20% more memory in roaring than a plain bitset, that is strong evidence against roaring, even if its much better in other cases. OTOH, if its 1% in those cases, and 50% better in others, then we should probably still go with roaring. From the analysis I did above, it was less than 1% overhead for alternating bits, which I thought would be the worst case. Are there other cases you think are important to look at in particular? [~Qin Yao] thanks so much for that analysis. That seems to agree with our expectations from the analysis above. The one case in which the roaring bitmap is much worse can be explained by the lack of a call to {{runOptimize}}. Can I ask you take it your analysis a step further? (a) use roaring bitmap 0.5.10 (b) I think we'd be better of if you just looked at the bitsets directly, rather than going through spark. First of all, its kind of confusing to talk about dense outputs from spark leading to sparse bitmaps b/c the code uses empty (c) Given Daniel's explanation above of {{runOptimize}}, can you be sure to call {{runOptimize}} after adding all the bits to the roaring bitmap? That would be the equivalent of adding it here to the spark 1.5 code. https://github.com/apache/spark/blob/branch-1.5/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala#L197 (d) Just to eliminate any doubts -- can you see the memory usage from roaring bitmap with the bits reversed? I understand that it *should* be the same, but might as well double check. (e) can you add a case for alternating bits (and whatever else Daniel suggests for worst cases)? In my mind, that should settle things. It would be great to have the code for that analysis attached here if possible, if we ever revisit it. I have every reason to believe this will confirm that we should be using Roaring (with a call to {{runOptimize}} added), but lets just make the case very clear. > Make MapStatus use less memory uage > --- > > Key: SPARK-11583 > URL: https://issues.apache.org/jira/browse/SPARK-11583 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core >Reporter: Kent Yao > > In the resolved issue https://issues.apache.org/jira/browse/SPARK-11271, as I > said, using BitSet can save ≈20% memory usage compared to RoaringBitMap. > For a spark job contains quite a lot of tasks, 20% seems a drop in the ocean. > Essentially, BitSet uses long[]. For example a BitSet[200k] = long[3125]. > So if we use a HashSet[Int] to store reduceId (when non-empty blocks are > dense,use reduceId of empty blocks; when sparse, use non-empty ones). > For dense cases: if HashSet[Int](numNonEmptyBlocks).size < > BitSet[totalBlockNum], I use MapStatusTrackingNoEmptyBlocks > For sparse cases: if HashSet[Int](numEmptyBlocks).size < > BitSet[totalBlockNum], I use MapStatusTrackingEmptyBlocks > sparse case, 299/300 are empty > sc.makeRDD(1 to 3, 3000).groupBy(x=>x).top(5) > dense case, no block is empty > sc.makeRDD(1 to 900, 3000).groupBy(x=>x).top(5) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11583) Make MapStatus use less memory uage
[ https://issues.apache.org/jira/browse/SPARK-11583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15003144#comment-15003144 ] Daniel Lemire commented on SPARK-11583: --- [~irashid] Sounds great. Should the code and data be available, we can take it apart and extend it as needed. > Make MapStatus use less memory uage > --- > > Key: SPARK-11583 > URL: https://issues.apache.org/jira/browse/SPARK-11583 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core >Reporter: Kent Yao > > In the resolved issue https://issues.apache.org/jira/browse/SPARK-11271, as I > said, using BitSet can save ≈20% memory usage compared to RoaringBitMap. > For a spark job contains quite a lot of tasks, 20% seems a drop in the ocean. > Essentially, BitSet uses long[]. For example a BitSet[200k] = long[3125]. > So if we use a HashSet[Int] to store reduceId (when non-empty blocks are > dense,use reduceId of empty blocks; when sparse, use non-empty ones). > For dense cases: if HashSet[Int](numNonEmptyBlocks).size < > BitSet[totalBlockNum], I use MapStatusTrackingNoEmptyBlocks > For sparse cases: if HashSet[Int](numEmptyBlocks).size < > BitSet[totalBlockNum], I use MapStatusTrackingEmptyBlocks > sparse case, 299/300 are empty > sc.makeRDD(1 to 3, 3000).groupBy(x=>x).top(5) > dense case, no block is empty > sc.makeRDD(1 to 900, 3000).groupBy(x=>x).top(5) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11583) Make MapStatus use less memory uage
[ https://issues.apache.org/jira/browse/SPARK-11583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15001986#comment-15001986 ] Apache Spark commented on SPARK-11583: -- User 'yaooqinn' has created a pull request for this issue: https://github.com/apache/spark/pull/9661 > Make MapStatus use less memory uage > --- > > Key: SPARK-11583 > URL: https://issues.apache.org/jira/browse/SPARK-11583 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core >Reporter: Kent Yao > > In the resolved issue https://issues.apache.org/jira/browse/SPARK-11271, as I > said, using BitSet can save ≈20% memory usage compared to RoaringBitMap. > For a spark job contains quite a lot of tasks, 20% seems a drop in the ocean. > Essentially, BitSet uses long[]. For example a BitSet[200k] = long[3125]. > So if we use a HashSet[Int] to store reduceId (when non-empty blocks are > dense,use reduceId of empty blocks; when sparse, use non-empty ones). > For dense cases: if HashSet[Int](numNonEmptyBlocks).size < > BitSet[totalBlockNum], I use MapStatusTrackingNoEmptyBlocks > For sparse cases: if HashSet[Int](numEmptyBlocks).size < > BitSet[totalBlockNum], I use MapStatusTrackingEmptyBlocks > sparse case, 299/300 are empty > sc.makeRDD(1 to 3, 3000).groupBy(x=>x).top(5) > dense case, no block is empty > sc.makeRDD(1 to 900, 3000).groupBy(x=>x).top(5) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11583) Make MapStatus use less memory uage
[ https://issues.apache.org/jira/browse/SPARK-11583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15000187#comment-15000187 ] Kent Yao commented on SPARK-11583: -- @lemire [~imranr] 1. test cases 1.1 sparse case: for each task 10 blocks contains data, others dont sc.makeRDD(1 to 40950, 4095).groupBy(x=>x).top(5) 1.2 dense case: for each task most block contains data, few dont 1.2.1 full sc.makeRDD(1 to 16769025, 4095).groupBy(x=>x).top(5) 1.2.2 very dense: about 95 empty blocks sc.makeRDD(1 to 1638, 4095).groupBy(x=>x).top(5) 1.3 test tool jmap -dump:format=b,file=heap.bin 1.4 test branches: branch-1.5, master 2. memory usage 2.1 RoaringBitmap--sparse Class Name | Objects | Shallow Heap | Retained Heap - org.apache.spark.scheduler.HighlyCompressedMapStatus| 4,095 | 131,040 | >= 34,135,920 - my explaination: 4095 * short[4095-10] =4095 * 16 * 4085 / 8 ≈ 34,135,920 2.2.1 RoaringBitmap--full Class Name | Objects | Shallow Heap | Retained Heap - org.apache.spark.scheduler.HighlyCompressedMapStatus| 4,095 | 131,040 | >= 360,360 - my explaination:RoaringBitmap(0) 2.2.2 RoaringBitmap--very dense Class Name | Objects | Shallow Heap | Retained Heap - org.apache.spark.scheduler.HighlyCompressedMapStatus| 4,095 | 131,040 | >= 1,441,440 - my explaination:4095 * short[95] = 4095 * 16 * 95 / 8 = 778, 050 (+ others = 1441440) 2.3 BitSet--sparse Class Name | Objects | Shallow Heap | Retained Heap - org.apache.spark.scheduler.HighlyCompressedMapStatus| 4,095 | 131,040 | >= 2,391,480 - my explaination:4095 * 4095 =16,769,025 + (others = 2,391,480) 2.4 BitSet--full Class Name | Objects | Shallow Heap | Retained Heap - org.apache.spark.scheduler.HighlyCompressedMapStatus| 4,095 | 131,040 | >= 2,391,480 - my explaination:same as the above 3. conclusion memory usage: RoaringBitmap--full < RoaringBitmap--very dense < BitSet--full = BitSet--sparse < RoaringBitmap--sparse > Make MapStatus use less memory uage > --- > > Key: SPARK-11583 > URL: https://issues.apache.org/jira/browse/SPARK-11583 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core >Reporter: Kent Yao > > In the resolved issue https://issues.apache.org/jira/browse/SPARK-11271, as I > said, using BitSet can save ≈20% memory usage compared to RoaringBitMap. > For a spark job contains quite a lot of tasks, 20% seems a drop in the ocean. > Essentially, BitSet uses long[]. For example a BitSet[200k] = long[3125]. > So if we use a HashSet[Int] to store reduceId (when non-empty blocks are > dense,use reduceId of empty blocks; when sparse, use non-empty ones). > For dense cases: if HashSet[Int](numNonEmptyBlocks).size < > BitSet[totalBlockNum], I use MapStatusTrackingNoEmptyBlocks > For sparse cases: if HashSet[Int](numEmptyBlocks).size < > BitSet[totalBlockNum], I use MapStatusTrackingEmptyBlocks > sparse case, 299/300 are empty > sc.makeRDD(1 to 3, 3000).groupBy(x=>x).top(5) > dense case, no block is empty > sc.makeRDD(1 to 900, 3000).groupBy(x=>x).top(5) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11583) Make MapStatus use less memory uage
[ https://issues.apache.org/jira/browse/SPARK-11583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15000339#comment-15000339 ] Daniel Lemire commented on SPARK-11583: --- [~Qin Yao] [~aimran50] If you guys want to setup a documented benchmark with accompanying analysis. I'd be glad to help, if my help is needed. Ideally, a benchmark should be setup based on interesting or representative workloads so that it is as relevant as possible to applications. No data structure, no matter which one, is always best in all possible scenarios. There are certainly cases where an uncompressed BitSet is the best choice. Other cases where a hash set is best. And so forth. What matters is what works for the applications at hand. > Make MapStatus use less memory uage > --- > > Key: SPARK-11583 > URL: https://issues.apache.org/jira/browse/SPARK-11583 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core >Reporter: Kent Yao > > In the resolved issue https://issues.apache.org/jira/browse/SPARK-11271, as I > said, using BitSet can save ≈20% memory usage compared to RoaringBitMap. > For a spark job contains quite a lot of tasks, 20% seems a drop in the ocean. > Essentially, BitSet uses long[]. For example a BitSet[200k] = long[3125]. > So if we use a HashSet[Int] to store reduceId (when non-empty blocks are > dense,use reduceId of empty blocks; when sparse, use non-empty ones). > For dense cases: if HashSet[Int](numNonEmptyBlocks).size < > BitSet[totalBlockNum], I use MapStatusTrackingNoEmptyBlocks > For sparse cases: if HashSet[Int](numEmptyBlocks).size < > BitSet[totalBlockNum], I use MapStatusTrackingEmptyBlocks > sparse case, 299/300 are empty > sc.makeRDD(1 to 3, 3000).groupBy(x=>x).top(5) > dense case, no block is empty > sc.makeRDD(1 to 900, 3000).groupBy(x=>x).top(5) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11583) Make MapStatus use less memory uage
[ https://issues.apache.org/jira/browse/SPARK-11583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15001420#comment-15001420 ] Kent Yao commented on SPARK-11583: -- Are these methods {noformat} r.add(0,20) {noformat} and {noformat}runOptimize {noformat} unavailable in current version of Roaring that spark uses? Accord to my test, i would like to say Roaring is better than BitSet, but not be used good enough... To most of spark tasks, dense cases may be usual. To those sparse cases, we may use Roaring with {noformat}runOptimize {noformat} or just track those non-empty block. > Make MapStatus use less memory uage > --- > > Key: SPARK-11583 > URL: https://issues.apache.org/jira/browse/SPARK-11583 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core >Reporter: Kent Yao > > In the resolved issue https://issues.apache.org/jira/browse/SPARK-11271, as I > said, using BitSet can save ≈20% memory usage compared to RoaringBitMap. > For a spark job contains quite a lot of tasks, 20% seems a drop in the ocean. > Essentially, BitSet uses long[]. For example a BitSet[200k] = long[3125]. > So if we use a HashSet[Int] to store reduceId (when non-empty blocks are > dense,use reduceId of empty blocks; when sparse, use non-empty ones). > For dense cases: if HashSet[Int](numNonEmptyBlocks).size < > BitSet[totalBlockNum], I use MapStatusTrackingNoEmptyBlocks > For sparse cases: if HashSet[Int](numEmptyBlocks).size < > BitSet[totalBlockNum], I use MapStatusTrackingEmptyBlocks > sparse case, 299/300 are empty > sc.makeRDD(1 to 3, 3000).groupBy(x=>x).top(5) > dense case, no block is empty > sc.makeRDD(1 to 900, 3000).groupBy(x=>x).top(5) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11583) Make MapStatus use less memory uage
[ https://issues.apache.org/jira/browse/SPARK-11583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14999009#comment-14999009 ] Imran Rashid commented on SPARK-11583: -- I think [~srowen] is right that we are going in circles a bit here, we don't just want to re-implement our own RoaringBitmap ... I have a feeling we actually made a mis-step in https://issues.apache.org/jira/browse/SPARK-11271. I think there was some confusion about the way these structures work, as well as some misleading comments. After a bit of digging, here is my understanding, but please correct me (would especially appreciate feedback from [~lemire]) 1. Given a relatively full set, {{RoaringBitmap}} is not going to do better than a {{BitSet}}. In fact, it will most likely use a bit more space, because its trying to do some extra book-keeping beyond a normal {{BitSet}}. >From https://issues.apache.org/jira/browse/SPARK-11271, it appears in one case this is 20%. However, we don't really know whether that is the worst case, relatively typical, or maybe it could be far worse in other cases. This might require a more thorough investigation from someone. 2. For sparse sets, a {{RoaringBitmap}} will use much less space than a {{BitSet}}, _including deserialized_. That is, the old comment in MapStatus, ["During serialization, this bitmap is compressed" | https://github.com/apache/spark/blob/6e823b4d7d52e9cf707f144256006e4575d23dc2/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala#L125] is very misleading -- the bitmap is always compressed. The original implementation of {{HighlyCompressedMapStatus}} was just mostly concerned with decreasing the size of the network traffic, but its also compressed in memory. 3. When a set is nearly full, {{RoaringBitmap}} does *not* automatically invert the bits in order to minimize space. Here's an example: {noformat} scala> import org.roaringbitmap._ import org.roaringbitmap._ scala> val rr = RoaringBitmap.bitmapOf(1,2,3,1000) rr: org.roaringbitmap.RoaringBitmap = {1,2,3,1000} scala> val x = rr.clone() x: org.roaringbitmap.RoaringBitmap = {1,2,3,1000} scala> x.flip(0,1001) scala> rr.getSizeInBytes() res1: Int = 22 scala> x.getSizeInBytes() res2: Int = 2008 {noformat} There is another comment in the old code: ["From a compression standpoint, it shouldn't matter whether we track empty or non-empty blocks" | https://github.com/apache/spark/blob/6e823b4d7d52e9cf707f144256006e4575d23dc2/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala#L177]. This was also a bit misleading. That comment was also probably referring to when the data was serialized, and sent over the network, with another layer of compression enabled, in which case the versions would be the same. But the *in memory* usage can actually be very different. So I'm pretty sure this means that after https://issues.apache.org/jira/browse/SPARK-11271, we are actually using much *more* memory when those sets are relatively empty -- that is, when most blocks are *non*-empty. We did at least reduce the worst-case memory usage that comes with a nearly full set (when most blocks are *empty*). The only thing Spark needs from this bitset is: 1) small size in-memory 2) small size serialized 3) fast {{contains()}} {{RoaringBitmap}} is optimized for some other use cases, eg. fast intersection & modification. Note that spark doesn't even need mutability for the bitsets in {{MapStatus}} -- after they are created, they are never changed. Nonetheless, {{RoaringBitmap}} might still be a good fit because it does compression. I think our options are: 1. Roll our own compressed bit set -- basically what is in the current PR 2. Go back to RoaringBitmap, but choose whether to store the empty or non-empty blocks based on what will use the least memory. 3. Look for some other pre-existing bitset implementation which is closer to our needs. I'm leaning towards (1), and moving forward with the PR, but I thought it was worth clarifying the situation and making sure we understood what was going on. > Make MapStatus use less memory uage > --- > > Key: SPARK-11583 > URL: https://issues.apache.org/jira/browse/SPARK-11583 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core >Reporter: Kent Yao > > In the resolved issue https://issues.apache.org/jira/browse/SPARK-11271, as I > said, using BitSet can save ≈20% memory usage compared to RoaringBitMap. > For a spark job contains quite a lot of tasks, 20% seems a drop in the ocean. > Essentially, BitSet uses long[]. For example a BitSet[200k] = long[3125]. > So if we use a HashSet[Int] to store reduceId (when non-empty blocks are > dense,use reduceId of empty blocks; when sparse, use non-empty ones). > For dense cases: if
[jira] [Commented] (SPARK-11583) Make MapStatus use less memory uage
[ https://issues.apache.org/jira/browse/SPARK-11583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14999130#comment-14999130 ] Daniel Lemire commented on SPARK-11583: --- > So, this is about compressing the in-memory representation in some way, > whereas roaringbitmap compressed the external representation? Probably not. Roaring bitmaps use about as much RAM as they use serialized bytes. Possibly Spark would either need to use a version of Roaring like Lucene (that eagerly handles the flips) or make use of our "runOptimize" method. See my answer below to [~irashid]. > Make MapStatus use less memory uage > --- > > Key: SPARK-11583 > URL: https://issues.apache.org/jira/browse/SPARK-11583 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core >Reporter: Kent Yao > > In the resolved issue https://issues.apache.org/jira/browse/SPARK-11271, as I > said, using BitSet can save ≈20% memory usage compared to RoaringBitMap. > For a spark job contains quite a lot of tasks, 20% seems a drop in the ocean. > Essentially, BitSet uses long[]. For example a BitSet[200k] = long[3125]. > So if we use a HashSet[Int] to store reduceId (when non-empty blocks are > dense,use reduceId of empty blocks; when sparse, use non-empty ones). > For dense cases: if HashSet[Int](numNonEmptyBlocks).size < > BitSet[totalBlockNum], I use MapStatusTrackingNoEmptyBlocks > For sparse cases: if HashSet[Int](numEmptyBlocks).size < > BitSet[totalBlockNum], I use MapStatusTrackingEmptyBlocks > sparse case, 299/300 are empty > sc.makeRDD(1 to 3, 3000).groupBy(x=>x).top(5) > dense case, no block is empty > sc.makeRDD(1 to 900, 3000).groupBy(x=>x).top(5) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11583) Make MapStatus use less memory uage
[ https://issues.apache.org/jira/browse/SPARK-11583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14999117#comment-14999117 ] Daniel Lemire commented on SPARK-11583: --- > When a set is nearly full, RoaringBitmap does not automatically invert the > bits in order to minimize space. The Roaring implementation in Lucene invert bits to minimize space, as descriped... https://github.com/apache/lucene-solr/blob/trunk/lucene/core/src/java/org/apache/lucene/util/RoaringDocIdSet.java The RoaringBitmap libary which we produced does not. However, it does something similar upon request. You might want to try... x.flip(0,1001); x.runOptimize(); x.getSizeInBytes(); The call to runOptimize should significantly reduce memory usage in this case. The intention is that users should call "runOptimize" when their bitmaps has been created and is no long expected to be changed frequently. So "runOptimize" should always be called prior to serialization. > Make MapStatus use less memory uage > --- > > Key: SPARK-11583 > URL: https://issues.apache.org/jira/browse/SPARK-11583 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core >Reporter: Kent Yao > > In the resolved issue https://issues.apache.org/jira/browse/SPARK-11271, as I > said, using BitSet can save ≈20% memory usage compared to RoaringBitMap. > For a spark job contains quite a lot of tasks, 20% seems a drop in the ocean. > Essentially, BitSet uses long[]. For example a BitSet[200k] = long[3125]. > So if we use a HashSet[Int] to store reduceId (when non-empty blocks are > dense,use reduceId of empty blocks; when sparse, use non-empty ones). > For dense cases: if HashSet[Int](numNonEmptyBlocks).size < > BitSet[totalBlockNum], I use MapStatusTrackingNoEmptyBlocks > For sparse cases: if HashSet[Int](numEmptyBlocks).size < > BitSet[totalBlockNum], I use MapStatusTrackingEmptyBlocks > sparse case, 299/300 are empty > sc.makeRDD(1 to 3, 3000).groupBy(x=>x).top(5) > dense case, no block is empty > sc.makeRDD(1 to 900, 3000).groupBy(x=>x).top(5) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11583) Make MapStatus use less memory uage
[ https://issues.apache.org/jira/browse/SPARK-11583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14999142#comment-14999142 ] Daniel Lemire commented on SPARK-11583: --- [~Qin Yao] wrote: "Roaringbitmap is same as the BitSet now we use in HiglyCompressedMapStatus, but take 20% memory usage more than BitSet. They both don't be compressed in-memory. According to the annotations of the former Roaring-HiglyCompressedMapStatus, it can be compressed during serialization not in-memory." I think that's a misunderstanding. Lucene and Apache Kylin use Roaring for in-memory bitmaps, and it saves a ton of memory. Druid uses them for memory-mapped bitmaps, and it compresses well. If you do flips, then it is possible that Roaring might end up being inefficient. Lucene has one approach to that, in RoaringBitmap, we offer the "runOptimize" function. > Make MapStatus use less memory uage > --- > > Key: SPARK-11583 > URL: https://issues.apache.org/jira/browse/SPARK-11583 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core >Reporter: Kent Yao > > In the resolved issue https://issues.apache.org/jira/browse/SPARK-11271, as I > said, using BitSet can save ≈20% memory usage compared to RoaringBitMap. > For a spark job contains quite a lot of tasks, 20% seems a drop in the ocean. > Essentially, BitSet uses long[]. For example a BitSet[200k] = long[3125]. > So if we use a HashSet[Int] to store reduceId (when non-empty blocks are > dense,use reduceId of empty blocks; when sparse, use non-empty ones). > For dense cases: if HashSet[Int](numNonEmptyBlocks).size < > BitSet[totalBlockNum], I use MapStatusTrackingNoEmptyBlocks > For sparse cases: if HashSet[Int](numEmptyBlocks).size < > BitSet[totalBlockNum], I use MapStatusTrackingEmptyBlocks > sparse case, 299/300 are empty > sc.makeRDD(1 to 3, 3000).groupBy(x=>x).top(5) > dense case, no block is empty > sc.makeRDD(1 to 900, 3000).groupBy(x=>x).top(5) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11583) Make MapStatus use less memory uage
[ https://issues.apache.org/jira/browse/SPARK-11583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14999185#comment-14999185 ] Imran Rashid commented on SPARK-11583: -- thanks [~lemire]. So I guess that means that option 2, going back to RoaringBitmap, would really just require us to insert a call to {{runOptimize}}, no need for us to worry about whether or not flip the bits ourselves. For the case of relatively full blocks, does the 20% overhead seem like a reasonable amount to you? That sounds super-high to me. If I understand correctly, the worst case of extra overhead will be when all the blocks are dense? Furthermore, it seems like roaring will actually save space, unless the maximum element is exactly {{2^n -1}}. Otherwise, the roaring bitmap will still be smaller b/c it can make the final block smaller. (Well, I guess the condition isn't "exactly", its a bit more subtle, but without going into too many specifics ...). I actually tried with 2^17 - 1, and a {{java.util.BitSet}} saved less than 1% over roaring. So I'm now more inclined to stick with RoaringBitmap and let it do its job (just make sure we use it correctly by adding a call to {{runOptimize}} > Make MapStatus use less memory uage > --- > > Key: SPARK-11583 > URL: https://issues.apache.org/jira/browse/SPARK-11583 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core >Reporter: Kent Yao > > In the resolved issue https://issues.apache.org/jira/browse/SPARK-11271, as I > said, using BitSet can save ≈20% memory usage compared to RoaringBitMap. > For a spark job contains quite a lot of tasks, 20% seems a drop in the ocean. > Essentially, BitSet uses long[]. For example a BitSet[200k] = long[3125]. > So if we use a HashSet[Int] to store reduceId (when non-empty blocks are > dense,use reduceId of empty blocks; when sparse, use non-empty ones). > For dense cases: if HashSet[Int](numNonEmptyBlocks).size < > BitSet[totalBlockNum], I use MapStatusTrackingNoEmptyBlocks > For sparse cases: if HashSet[Int](numEmptyBlocks).size < > BitSet[totalBlockNum], I use MapStatusTrackingEmptyBlocks > sparse case, 299/300 are empty > sc.makeRDD(1 to 3, 3000).groupBy(x=>x).top(5) > dense case, no block is empty > sc.makeRDD(1 to 900, 3000).groupBy(x=>x).top(5) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11583) Make MapStatus use less memory uage
[ https://issues.apache.org/jira/browse/SPARK-11583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14999309#comment-14999309 ] Daniel Lemire commented on SPARK-11583: --- [~irashid] What I would suggest is a quantified benchmark. E.g., the Elastic people did something of the sort... comparing various formats including a BitSet, Roaring, and so forth, see https://www.elastic.co/blog/frame-of-reference-and-roaring-bitmaps I'm available to help with this benchmark if it is needed... > Make MapStatus use less memory uage > --- > > Key: SPARK-11583 > URL: https://issues.apache.org/jira/browse/SPARK-11583 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core >Reporter: Kent Yao > > In the resolved issue https://issues.apache.org/jira/browse/SPARK-11271, as I > said, using BitSet can save ≈20% memory usage compared to RoaringBitMap. > For a spark job contains quite a lot of tasks, 20% seems a drop in the ocean. > Essentially, BitSet uses long[]. For example a BitSet[200k] = long[3125]. > So if we use a HashSet[Int] to store reduceId (when non-empty blocks are > dense,use reduceId of empty blocks; when sparse, use non-empty ones). > For dense cases: if HashSet[Int](numNonEmptyBlocks).size < > BitSet[totalBlockNum], I use MapStatusTrackingNoEmptyBlocks > For sparse cases: if HashSet[Int](numEmptyBlocks).size < > BitSet[totalBlockNum], I use MapStatusTrackingEmptyBlocks > sparse case, 299/300 are empty > sc.makeRDD(1 to 3, 3000).groupBy(x=>x).top(5) > dense case, no block is empty > sc.makeRDD(1 to 900, 3000).groupBy(x=>x).top(5) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11583) Make MapStatus use less memory uage
[ https://issues.apache.org/jira/browse/SPARK-11583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14999835#comment-14999835 ] Kent Yao commented on SPARK-11583: -- [~imranr] [~dlemire] The JVM I use : -Xmx256M -Xms32M bq. scala> import org.roaringbitmap.RoaringBitmap import org.roaringbitmap.RoaringBitmap scala> val rbm = new RoaringBitmap() rbm: org.roaringbitmap.RoaringBitmap = {} scala> for (i <- 1 to 20) rbm.add(i) scala> val ar2 = Array.fill(20)(rbm) java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:2367) at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130) at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114) at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:415) at java.lang.StringBuilder.append(StringBuilder.java:132) at org.roaringbitmap.RoaringBitmap.toString(RoaringBitmap.java:853) at scala.runtime.ScalaRunTime$.scala$runtime$ScalaRunTime$$inner$1(ScalaRunTime.scala:324) at scala.runtime.ScalaRunTime$$anonfun$arrayToString$1$2.apply(ScalaRunTime.scala:306) at scala.runtime.ScalaRunTime$$anonfun$arrayToString$1$2.apply(ScalaRunTime.scala:306) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at scala.runtime.ScalaRunTime$.arrayToString$1(ScalaRunTime.scala:306) at scala.runtime.ScalaRunTime$.scala$runtime$ScalaRunTime$$inner$1(ScalaRunTime.scala:318) at scala.runtime.ScalaRunTime$.stringOf(ScalaRunTime.scala:329) at scala.runtime.ScalaRunTime$.replStringOf(ScalaRunTime.scala:337) at .(:10) at .() at $print() at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734) at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983) at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573) at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604) at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568) at scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760) bq. scala> import org.apache.spark.util.collection.BitSet import org.apache.spark.util.collection.BitSet scala> val bs = new BitSet(20) bs: org.apache.spark.util.collection.BitSet = org.apache.spark.util.collection.BitSet@15b769b2 scala> for (i <- 0 to 19) bs.set(i) scala> val ar = Array.fill(20)(bs) ar: Array[org.apache.spark.util.collection.BitSet] = Array(org.apache.spark.util.collection.BitSet@15b769b2, org.apache.spark.util.collection.BitSet@15b769b2, org.apache.spark.util.collection.BitSet@15b769b2, org.apache.spark.util.collection.BitSet@15b769b2, org.apache.spark.util.collection.BitSet@15b769b2, org.apache.spark.util.collection.BitSet@15b769b2, org.apache.spark.util.collection.BitSet@15b769b2, org.apache.spark.util.collection.BitSet@15b769b2, org.apache.spark.util.collection.BitSet@15b769b2, org.apache.spark.util.collection.BitSet@15b769b2, org.apache.spark.util.collection.BitSet@15b769b2, org.apache.spark.util.collection.BitSet@15b769b2, org.apache.spark.util.collection.BitSet@15b769b2, org.apache.spark.util.collection.BitSet@15b769b2, org.apache.spark.util.collection.BitSe... scala> > Make MapStatus use less memory uage > --- > > Key: SPARK-11583 > URL: https://issues.apache.org/jira/browse/SPARK-11583 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core >Reporter: Kent Yao > > In the resolved issue https://issues.apache.org/jira/browse/SPARK-11271, as I > said, using BitSet can save ≈20% memory usage compared to RoaringBitMap. > For a spark job contains quite a lot of tasks, 20% seems a drop in the ocean. > Essentially, BitSet uses long[]. For example a BitSet[200k] = long[3125]. > So if we use a HashSet[Int] to store reduceId (when non-empty blocks are > dense,use reduceId of empty blocks; when sparse, use non-empty ones). > For dense cases: if
[jira] [Commented] (SPARK-11583) Make MapStatus use less memory uage
[ https://issues.apache.org/jira/browse/SPARK-11583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14999848#comment-14999848 ] Imran Rashid commented on SPARK-11583: -- Hi [~Qin Yao] -- unfortunately that example is OOMing for a different reason. The scala shell is just trying to call {{toString}} the roaring bitmap 200k times, and building a string that is too big. It works for the Bitset just b/c the bitset.toString() just returns a short string, the object ID, instead of a full list of the set bits. also that test doesn't really test creating 200k bitsets -- the array is just storing many references to the same object. (You can see in the toString of the bitset array, the same object id is repeated over and over.) You need to do something more like {noformat} val ar = Array.fill(20){ val bs = new BitSet(20) for (i <- 0 to 19) bs.set(i) bs } {noformat} similarly for the roaring bitmap. Though there really isn't any need to wrap in the array, we are just interested in the memory of one object, which you can measure without going to OOM, eg. with {{jmap}}. (incidentally, both versions lead to OOM for me when you actually create 200k different objects.) > Make MapStatus use less memory uage > --- > > Key: SPARK-11583 > URL: https://issues.apache.org/jira/browse/SPARK-11583 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core >Reporter: Kent Yao > > In the resolved issue https://issues.apache.org/jira/browse/SPARK-11271, as I > said, using BitSet can save ≈20% memory usage compared to RoaringBitMap. > For a spark job contains quite a lot of tasks, 20% seems a drop in the ocean. > Essentially, BitSet uses long[]. For example a BitSet[200k] = long[3125]. > So if we use a HashSet[Int] to store reduceId (when non-empty blocks are > dense,use reduceId of empty blocks; when sparse, use non-empty ones). > For dense cases: if HashSet[Int](numNonEmptyBlocks).size < > BitSet[totalBlockNum], I use MapStatusTrackingNoEmptyBlocks > For sparse cases: if HashSet[Int](numEmptyBlocks).size < > BitSet[totalBlockNum], I use MapStatusTrackingEmptyBlocks > sparse case, 299/300 are empty > sc.makeRDD(1 to 3, 3000).groupBy(x=>x).top(5) > dense case, no block is empty > sc.makeRDD(1 to 900, 3000).groupBy(x=>x).top(5) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11583) Make MapStatus use less memory uage
[ https://issues.apache.org/jira/browse/SPARK-11583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14999882#comment-14999882 ] Daniel Lemire commented on SPARK-11583: --- [~irashid] Calling so many times a function to set consecutive values would be wasteful. With Java's BitSet, you could call... {code:java} bs.set(0,20) {code} The memory usage would the same (about 25kB) but the construction time would be much less... Similarly, on a {{RoaringBitmap}} object you could do: {code:java} r.add(0,20) {code} The object would be automatically compressed down to a few bytes (much less than a BitSet). It is an instance where a {{RoaringBitmap}} would probably use three orders of memory less. If you instead do add the values one by one, you could also call {{runOptimize}} on the result (at the very end, or a few times in the process). Again a RoaringBitmap would use thousands of times less memory. This is, by the way, a perfect example of a case where you neither want to use a hash set or an uncompressed bitset. > Make MapStatus use less memory uage > --- > > Key: SPARK-11583 > URL: https://issues.apache.org/jira/browse/SPARK-11583 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core >Reporter: Kent Yao > > In the resolved issue https://issues.apache.org/jira/browse/SPARK-11271, as I > said, using BitSet can save ≈20% memory usage compared to RoaringBitMap. > For a spark job contains quite a lot of tasks, 20% seems a drop in the ocean. > Essentially, BitSet uses long[]. For example a BitSet[200k] = long[3125]. > So if we use a HashSet[Int] to store reduceId (when non-empty blocks are > dense,use reduceId of empty blocks; when sparse, use non-empty ones). > For dense cases: if HashSet[Int](numNonEmptyBlocks).size < > BitSet[totalBlockNum], I use MapStatusTrackingNoEmptyBlocks > For sparse cases: if HashSet[Int](numEmptyBlocks).size < > BitSet[totalBlockNum], I use MapStatusTrackingEmptyBlocks > sparse case, 299/300 are empty > sc.makeRDD(1 to 3, 3000).groupBy(x=>x).top(5) > dense case, no block is empty > sc.makeRDD(1 to 900, 3000).groupBy(x=>x).top(5) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11583) Make MapStatus use less memory uage
[ https://issues.apache.org/jira/browse/SPARK-11583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14999897#comment-14999897 ] Daniel Lemire commented on SPARK-11583: --- [~Qin Yao] The RoaringBitmap library is able to store a set containing all integers in an arbitrary range like [0,20) using only a few bytes, orders of magnitude less than a Java BitSet. > Make MapStatus use less memory uage > --- > > Key: SPARK-11583 > URL: https://issues.apache.org/jira/browse/SPARK-11583 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core >Reporter: Kent Yao > > In the resolved issue https://issues.apache.org/jira/browse/SPARK-11271, as I > said, using BitSet can save ≈20% memory usage compared to RoaringBitMap. > For a spark job contains quite a lot of tasks, 20% seems a drop in the ocean. > Essentially, BitSet uses long[]. For example a BitSet[200k] = long[3125]. > So if we use a HashSet[Int] to store reduceId (when non-empty blocks are > dense,use reduceId of empty blocks; when sparse, use non-empty ones). > For dense cases: if HashSet[Int](numNonEmptyBlocks).size < > BitSet[totalBlockNum], I use MapStatusTrackingNoEmptyBlocks > For sparse cases: if HashSet[Int](numEmptyBlocks).size < > BitSet[totalBlockNum], I use MapStatusTrackingEmptyBlocks > sparse case, 299/300 are empty > sc.makeRDD(1 to 3, 3000).groupBy(x=>x).top(5) > dense case, no block is empty > sc.makeRDD(1 to 900, 3000).groupBy(x=>x).top(5) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11583) Make MapStatus use less memory uage
[ https://issues.apache.org/jira/browse/SPARK-11583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14996206#comment-14996206 ] Kent Yao commented on SPARK-11583: -- 1. The _Compressed_ in MapStatus means *Array[Long]* -> *Array[Byte]* -> *Long(1)* + BitSet(Array.length), which represents RealSize -> CompressedMapStatus -> HiglyCompressedMapStatus, this is the in-memory representation. Roaringbitmap is same as the BitSet now we use in HiglyCompressedMapStatus, but take 20% memory usage more than BitSet. They both don't be compressed in-memory. According to the annotations of the former Roaring-HiglyCompressedMapStatus, it can be compressed during serialization not in-memory. 2. I will try optimize my code. Maybe only need more constructors for HiglyCompressedMapStatus but a new class. 3. The mapstatuses will be fetched by executors during next stage, if the size > 0,it will fetch shuffle results produced by the former stage, if < 0, not. If we can use much small mapstatus to realize this function, it can advoid driver's oom and reduce nio consumption. Here we separate two cases(sparse/dense), use little amount of Ints which means reduceIds where the blocks belongs, as long as reduceIds[nonEmptyBlocks] or reduceId[emptyBlocks] takes less bits than BitSet[TotalBlocks], we can use this optimization. The thresholds here are 1/32 and 31/32, cases between 1/32~31/32 BitSet is smaller. Here 32 represents sizeof(Int). > Make MapStatus use less memory uage > --- > > Key: SPARK-11583 > URL: https://issues.apache.org/jira/browse/SPARK-11583 > Project: Spark > Issue Type: Improvement >Reporter: Kent Yao > > In the resolved issue https://issues.apache.org/jira/browse/SPARK-11271, as I > said, using BitSet can save ≈20% memory usage compared to RoaringBitMap. > For a spark job contains quite a lot of tasks, 20% seems a drop in the ocean. > Essentially, BitSet uses long[]. For example a BitSet[200k] = long[3125]. > So if we use a HashSet[Int] to store reduceId (when non-empty blocks are > dense,use reduceId of empty blocks; when sparse, use non-empty ones). > For dense cases: if HashSet[Int](numNonEmptyBlocks).size < > BitSet[totalBlockNum], I use MapStatusTrackingNoEmptyBlocks > For sparse cases: if HashSet[Int](numEmptyBlocks).size < > BitSet[totalBlockNum], I use MapStatusTrackingEmptyBlocks > sparse case, 299/300 are empty > sc.makeRDD(1 to 3, 3000).groupBy(x=>x).top(5) > dense case, no block is empty > sc.makeRDD(1 to 900, 3000).groupBy(x=>x).top(5) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11583) Make MapStatus use less memory uage
[ https://issues.apache.org/jira/browse/SPARK-11583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14996266#comment-14996266 ] Sean Owen commented on SPARK-11583: --- Yeah a lot of this depends on the savings in memory vs correctness and speed. You'd probably want to keep this simple, since it needs good tests, so better to reuse existing code and not make parallel implementations just to cover an implementation detail. > Make MapStatus use less memory uage > --- > > Key: SPARK-11583 > URL: https://issues.apache.org/jira/browse/SPARK-11583 > Project: Spark > Issue Type: Improvement >Reporter: Kent Yao > > In the resolved issue https://issues.apache.org/jira/browse/SPARK-11271, as I > said, using BitSet can save ≈20% memory usage compared to RoaringBitMap. > For a spark job contains quite a lot of tasks, 20% seems a drop in the ocean. > Essentially, BitSet uses long[]. For example a BitSet[200k] = long[3125]. > So if we use a HashSet[Int] to store reduceId (when non-empty blocks are > dense,use reduceId of empty blocks; when sparse, use non-empty ones). > For dense cases: if HashSet[Int](numNonEmptyBlocks).size < > BitSet[totalBlockNum], I use MapStatusTrackingNoEmptyBlocks > For sparse cases: if HashSet[Int](numEmptyBlocks).size < > BitSet[totalBlockNum], I use MapStatusTrackingEmptyBlocks > sparse case, 299/300 are empty > sc.makeRDD(1 to 3, 3000).groupBy(x=>x).top(5) > dense case, no block is empty > sc.makeRDD(1 to 900, 3000).groupBy(x=>x).top(5) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11583) Make MapStatus use less memory uage
[ https://issues.apache.org/jira/browse/SPARK-11583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14996415#comment-14996415 ] Kent Yao commented on SPARK-11583: -- I have reviewed my code and merged two classes. And I get confused whether to combine the new class to HiglyCompressedMapStatus, most methods of these two class just have the same name but different realization. If combined, there'are obviously a lot of if/elses... Could you review my code and give me some advices ? > Make MapStatus use less memory uage > --- > > Key: SPARK-11583 > URL: https://issues.apache.org/jira/browse/SPARK-11583 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core >Reporter: Kent Yao > > In the resolved issue https://issues.apache.org/jira/browse/SPARK-11271, as I > said, using BitSet can save ≈20% memory usage compared to RoaringBitMap. > For a spark job contains quite a lot of tasks, 20% seems a drop in the ocean. > Essentially, BitSet uses long[]. For example a BitSet[200k] = long[3125]. > So if we use a HashSet[Int] to store reduceId (when non-empty blocks are > dense,use reduceId of empty blocks; when sparse, use non-empty ones). > For dense cases: if HashSet[Int](numNonEmptyBlocks).size < > BitSet[totalBlockNum], I use MapStatusTrackingNoEmptyBlocks > For sparse cases: if HashSet[Int](numEmptyBlocks).size < > BitSet[totalBlockNum], I use MapStatusTrackingEmptyBlocks > sparse case, 299/300 are empty > sc.makeRDD(1 to 3, 3000).groupBy(x=>x).top(5) > dense case, no block is empty > sc.makeRDD(1 to 900, 3000).groupBy(x=>x).top(5) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11583) Make MapStatus use less memory uage
[ https://issues.apache.org/jira/browse/SPARK-11583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14996147#comment-14996147 ] Kent Yao commented on SPARK-11583: -- [~srowen] MapStatus stores BlockManagerId where it locates and Array[Long] which are the sizes of shuffle results, One of its implementation HighlyCompressedMapStatus compresses the Array[long] to a *long* ---avgSize and a BitSet[Array.size] marking empty blocks. For large tasks such as 200k, the size of bitset[200k] is 128KB, when 200k mapstatuses be passed to the diver, it will be 200k * 128KB ≈ 4.66GB, each executor will fetch them later. What if only few blocks contain result , or only few blocks are empty (maybe only 10), we use a container stores the _reduceId: Int_ , size of such a mapstatus may be 10*32bit, which means the diver may only receive 61MB data. Am I right? > Make MapStatus use less memory uage > --- > > Key: SPARK-11583 > URL: https://issues.apache.org/jira/browse/SPARK-11583 > Project: Spark > Issue Type: Improvement >Reporter: Kent Yao > > In the resolved issue https://issues.apache.org/jira/browse/SPARK-11271, as I > said, using BitSet can save ≈20% memory usage compared to RoaringBitMap. > For a spark job contains quite a lot of tasks, 20% seems a drop in the ocean. > Essentially, BitSet uses long[]. For example a BitSet[200k] = long[3125]. > So if we use a HashSet[Int] to store reduceId (when non-empty blocks are > dense,use reduceId of empty blocks; when sparse, use non-empty ones). > For dense cases: if HashSet[Int](numNonEmptyBlocks).size < > BitSet[totalBlockNum], I use MapStatusTrackingNoEmptyBlocks > For sparse cases: if HashSet[Int](numEmptyBlocks).size < > BitSet[totalBlockNum], I use MapStatusTrackingEmptyBlocks > sparse case, 299/300 are empty > sc.makeRDD(1 to 3, 3000).groupBy(x=>x).top(5) > dense case, no block is empty > sc.makeRDD(1 to 900, 3000).groupBy(x=>x).top(5) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11583) Make MapStatus use less memory uage
[ https://issues.apache.org/jira/browse/SPARK-11583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14996098#comment-14996098 ] Apache Spark commented on SPARK-11583: -- User 'yaooqinn' has created a pull request for this issue: https://github.com/apache/spark/pull/9559 > Make MapStatus use less memory uage > --- > > Key: SPARK-11583 > URL: https://issues.apache.org/jira/browse/SPARK-11583 > Project: Spark > Issue Type: Improvement >Reporter: Kent Yao > > In the resolved issue https://issues.apache.org/jira/browse/SPARK-11271, as I > said, using BitSet can save ≈20% memory usage compared to RoaringBitMap. > For a spark job contains quite a lot of tasks, 20% seems a drop in the ocean. > Essentially, BitSet uses long[]. For example a BitSet[200k] = long[3125]. > So if we use a HashSet[Int] to store reduceId (when non-empty blocks are > dense,use reduceId of empty blocks; when sparse, use non-empty ones). > For dense cases: if HashSet[Int](numNonEmptyBlocks).size < > BitSet[totalBlockNum], I use MapStatusTrackingNoEmptyBlocks > For sparse cases: if HashSet[Int](numEmptyBlocks).size < > BitSet[totalBlockNum], I use MapStatusTrackingEmptyBlocks > sparse case, 299/300 are empty > sc.makeRDD(1 to 3, 3000).groupBy(x=>x).top(5) > dense case, no block is empty > sc.makeRDD(1 to 900, 3000).groupBy(x=>x).top(5) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11583) Make MapStatus use less memory uage
[ https://issues.apache.org/jira/browse/SPARK-11583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14996109#comment-14996109 ] Sean Owen commented on SPARK-11583: --- [~Qin Yao] but you're just re-describing a sparse set representation, which is what we removed. The argument was it didn't help enough here. See the original PR. I am not sure this is the issue at all, anyway. > Make MapStatus use less memory uage > --- > > Key: SPARK-11583 > URL: https://issues.apache.org/jira/browse/SPARK-11583 > Project: Spark > Issue Type: Improvement >Reporter: Kent Yao > > In the resolved issue https://issues.apache.org/jira/browse/SPARK-11271, as I > said, using BitSet can save ≈20% memory usage compared to RoaringBitMap. > For a spark job contains quite a lot of tasks, 20% seems a drop in the ocean. > Essentially, BitSet uses long[]. For example a BitSet[200k] = long[3125]. > So if we use a HashSet[Int] to store reduceId (when non-empty blocks are > dense,use reduceId of empty blocks; when sparse, use non-empty ones). > For dense cases: if HashSet[Int](numNonEmptyBlocks).size < > BitSet[totalBlockNum], I use MapStatusTrackingNoEmptyBlocks > For sparse cases: if HashSet[Int](numEmptyBlocks).size < > BitSet[totalBlockNum], I use MapStatusTrackingEmptyBlocks > sparse case, 299/300 are empty > sc.makeRDD(1 to 3, 3000).groupBy(x=>x).top(5) > dense case, no block is empty > sc.makeRDD(1 to 900, 3000).groupBy(x=>x).top(5) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11583) Make MapStatus use less memory uage
[ https://issues.apache.org/jira/browse/SPARK-11583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14996168#comment-14996168 ] Sean Owen commented on SPARK-11583: --- Yes, I understand the idea of a 'compressed' bitset being smaller when it's very sparse. So, this is about compressing the in-memory representation in some way, whereas roaringbitmap compressed the external representation? Does this really need a new class? you're just varying an internal representation of a class. Are you/we pretty sure this comes up enough to justify, and, is there a runtime performance hit? > Make MapStatus use less memory uage > --- > > Key: SPARK-11583 > URL: https://issues.apache.org/jira/browse/SPARK-11583 > Project: Spark > Issue Type: Improvement >Reporter: Kent Yao > > In the resolved issue https://issues.apache.org/jira/browse/SPARK-11271, as I > said, using BitSet can save ≈20% memory usage compared to RoaringBitMap. > For a spark job contains quite a lot of tasks, 20% seems a drop in the ocean. > Essentially, BitSet uses long[]. For example a BitSet[200k] = long[3125]. > So if we use a HashSet[Int] to store reduceId (when non-empty blocks are > dense,use reduceId of empty blocks; when sparse, use non-empty ones). > For dense cases: if HashSet[Int](numNonEmptyBlocks).size < > BitSet[totalBlockNum], I use MapStatusTrackingNoEmptyBlocks > For sparse cases: if HashSet[Int](numEmptyBlocks).size < > BitSet[totalBlockNum], I use MapStatusTrackingEmptyBlocks > sparse case, 299/300 are empty > sc.makeRDD(1 to 3, 3000).groupBy(x=>x).top(5) > dense case, no block is empty > sc.makeRDD(1 to 900, 3000).groupBy(x=>x).top(5) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org