Re: SizeEstimator

2018-02-27 Thread David Capwell
Thanks for the reply and sorry for my delayed response, had to go find the
profile data to lookup the class again.


https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala

That class extends SizeEstimator and has a field "map" which buffers the
rows.  In my case the buffer was > 1 million rows so became costly every
time it was checked.


This can be reproduced, create a random data set of (string, long), then
group by string (I believe this is what the code did first, there was a
sort later but should have been a different stage).  Make sure number of
executors is small (for example only one) else you are reducing the size of
M for each executor.

On Mon, Feb 26, 2018, 10:04 PM 叶先进 <advance...@gmail.com> wrote:

> What type is for the buffer you mentioned?
>
>
> On 27 Feb 2018, at 11:46 AM, David Capwell <dcapw...@gmail.com> wrote:
>
> advancedxy <advance...@gmail.com>, I don't remember the code as well
> anymore but what we hit was a very simple schema (string, long). The issue
> is the buffer had a million of these so SizeEstimator of the buffer had to
> keep recalculating the same elements over and over again.  SizeEstimator
> was on-cpu about 30% of the time, bounding the buffer got it to be < 5%
> (going off memory so may be off).
>
> The class info(size of fields lay on heap) is cached for every occurred
> class, so the size info of the same elements would not be recalculated.
> However, for Collection class (or similar) SizeEstimator will scan all the
> elements in the container (`next` field in LinkedList for example).
>
> And the array is a special case: SizeEstimator will sample array if
> array.length > ARRAY_SIZE_FOR_SAMPLING(400).
>
> The cost is really (assuming memory is O(1) which is not true) O(N × M)
> where N is number of rows in buffer and M is size of schema.  My case could
> be solved by not recomputing which would bring the cost to O(M) since
> bookkeeping should be consistent time. There was logic to delay
> recalculating bases off a change in frequency, but that didn't really do
> much for us, bounding and spilling was the bigger win in our case.
>
> On Mon, Feb 26, 2018, 7:24 PM Xin Liu <xin.e@gmail.com> wrote:
>
>> Thanks David. Another solution is to convert the protobuf object to byte
>> array, It does speed up SizeEstimator
>>
>> On Mon, Feb 26, 2018 at 5:34 PM, David Capwell <dcapw...@gmail.com>
>> wrote:
>>
>>> This is used to predict the current cost of memory so spark knows to
>>> flush or not. This is very costly for us so we use a flag marked in the
>>> code as private to lower the cost
>>>
>>> spark.shuffle.spill.numElementsForceSpillThreshold (on phone hope no
>>> typo) - how many records before flush
>>>
>>> This lowers the cost because it let's us leave data in young, if we
>>> don't bound we get everyone promoted to old and GC becomes a issue.  This
>>> doesn't solve the fact that the walk is slow, but lowers the cost of GC.
>>> For us we make sure to have spare memory on the system for page cache so
>>> spilling to disk for us is a memory write 99% of the time.  If your host
>>> has less free memory spilling may become more expensive.
>>>
>>>
>>> If the walk is your bottleneck and not GC then I would recommend JOL and
>>> guessing to better predict memory.
>>>
>>> On Mon, Feb 26, 2018, 4:47 PM Xin Liu <xin.e....@gmail.com> wrote:
>>>
>>>> Hi folks,
>>>>
>>>> We have a situation where, shuffled data is protobuf based, and
>>>> SizeEstimator is taking a lot of time.
>>>>
>>>> We have tried to override SizeEstimator to return a constant value,
>>>> which speeds up things a lot.
>>>>
>>>> My questions, what is the side effect of disabling SizeEstimator? Is it
>>>> just spark do memory reallocation, or there is more severe consequences?
>>>>
>>>> Thanks!
>>>>
>>>
>>
>


Re: SizeEstimator

2018-02-26 Thread 叶先进
What type is for the buffer you mentioned? 


> On 27 Feb 2018, at 11:46 AM, David Capwell <dcapw...@gmail.com> wrote:
> 
> advancedxy <mailto:advance...@gmail.com>, I don't remember the code as well 
> anymore but what we hit was a very simple schema (string, long). The issue is 
> the buffer had a million of these so SizeEstimator of the buffer had to keep 
> recalculating the same elements over and over again.  SizeEstimator was 
> on-cpu about 30% of the time, bounding the buffer got it to be < 5% (going 
> off memory so may be off).
> 
The class info(size of fields lay on heap) is cached for every occurred class, 
so the size info of the same elements would not be recalculated. However, for 
Collection class (or similar) SizeEstimator will scan all the elements in the 
container (`next` field in LinkedList for example).

And the array is a special case: SizeEstimator will sample array if 
array.length > ARRAY_SIZE_FOR_SAMPLING(400).
> The cost is really (assuming memory is O(1) which is not true) O(N × M) where 
> N is number of rows in buffer and M is size of schema.  My case could be 
> solved by not recomputing which would bring the cost to O(M) since 
> bookkeeping should be consistent time. There was logic to delay recalculating 
> bases off a change in frequency, but that didn't really do much for us, 
> bounding and spilling was the bigger win in our case.
> 
> 
> On Mon, Feb 26, 2018, 7:24 PM Xin Liu <xin.e@gmail.com 
> <mailto:xin.e@gmail.com>> wrote:
> Thanks David. Another solution is to convert the protobuf object to byte 
> array, It does speed up SizeEstimator
> 
> On Mon, Feb 26, 2018 at 5:34 PM, David Capwell <dcapw...@gmail.com 
> <mailto:dcapw...@gmail.com>> wrote:
> This is used to predict the current cost of memory so spark knows to flush or 
> not. This is very costly for us so we use a flag marked in the code as 
> private to lower the cost
> 
> spark.shuffle.spill.numElementsForceSpillThreshold (on phone hope no typo) - 
> how many records before flush
> 
> This lowers the cost because it let's us leave data in young, if we don't 
> bound we get everyone promoted to old and GC becomes a issue.  This doesn't 
> solve the fact that the walk is slow, but lowers the cost of GC. For us we 
> make sure to have spare memory on the system for page cache so spilling to 
> disk for us is a memory write 99% of the time.  If your host has less free 
> memory spilling may become more expensive.
> 
> 
> If the walk is your bottleneck and not GC then I would recommend JOL and 
> guessing to better predict memory.  
> 
> On Mon, Feb 26, 2018, 4:47 PM Xin Liu <xin.e@gmail.com 
> <mailto:xin.e@gmail.com>> wrote:
> Hi folks,
> 
> We have a situation where, shuffled data is protobuf based, and SizeEstimator 
> is taking a lot of time.
> 
> We have tried to override SizeEstimator to return a constant value, which 
> speeds up things a lot.
> 
> My questions, what is the side effect of disabling SizeEstimator? Is it just 
> spark do memory reallocation, or there is more severe consequences?
> 
> Thanks!
> 



Re: SizeEstimator

2018-02-26 Thread David Capwell
advancedxy <advance...@gmail.com>, I don't remember the code as well
anymore but what we hit was a very simple schema (string, long). The issue
is the buffer had a million of these so SizeEstimator of the buffer had to
keep recalculating the same elements over and over again.  SizeEstimator
was on-cpu about 30% of the time, bounding the buffer got it to be < 5%
(going off memory so may be off).

The cost is really (assuming memory is O(1) which is not true) O(N × M)
where N is number of rows in buffer and M is size of schema.  My case could
be solved by not recomputing which would bring the cost to O(M) since
bookkeeping should be consistent time. There was logic to delay
recalculating bases off a change in frequency, but that didn't really do
much for us, bounding and spilling was the bigger win in our case.

On Mon, Feb 26, 2018, 7:24 PM Xin Liu <xin.e@gmail.com> wrote:

> Thanks David. Another solution is to convert the protobuf object to byte
> array, It does speed up SizeEstimator
>
> On Mon, Feb 26, 2018 at 5:34 PM, David Capwell <dcapw...@gmail.com> wrote:
>
>> This is used to predict the current cost of memory so spark knows to
>> flush or not. This is very costly for us so we use a flag marked in the
>> code as private to lower the cost
>>
>> spark.shuffle.spill.numElementsForceSpillThreshold (on phone hope no
>> typo) - how many records before flush
>>
>> This lowers the cost because it let's us leave data in young, if we don't
>> bound we get everyone promoted to old and GC becomes a issue.  This doesn't
>> solve the fact that the walk is slow, but lowers the cost of GC. For us we
>> make sure to have spare memory on the system for page cache so spilling to
>> disk for us is a memory write 99% of the time.  If your host has less free
>> memory spilling may become more expensive.
>>
>>
>> If the walk is your bottleneck and not GC then I would recommend JOL and
>> guessing to better predict memory.
>>
>> On Mon, Feb 26, 2018, 4:47 PM Xin Liu <xin.e@gmail.com> wrote:
>>
>>> Hi folks,
>>>
>>> We have a situation where, shuffled data is protobuf based, and
>>> SizeEstimator is taking a lot of time.
>>>
>>> We have tried to override SizeEstimator to return a constant value,
>>> which speeds up things a lot.
>>>
>>> My questions, what is the side effect of disabling SizeEstimator? Is it
>>> just spark do memory reallocation, or there is more severe consequences?
>>>
>>> Thanks!
>>>
>>
>


Re: SizeEstimator

2018-02-26 Thread Xin Liu
Thanks David. Another solution is to convert the protobuf object to byte
array, It does speed up SizeEstimator

On Mon, Feb 26, 2018 at 5:34 PM, David Capwell <dcapw...@gmail.com> wrote:

> This is used to predict the current cost of memory so spark knows to flush
> or not. This is very costly for us so we use a flag marked in the code as
> private to lower the cost
>
> spark.shuffle.spill.numElementsForceSpillThreshold (on phone hope no
> typo) - how many records before flush
>
> This lowers the cost because it let's us leave data in young, if we don't
> bound we get everyone promoted to old and GC becomes a issue.  This doesn't
> solve the fact that the walk is slow, but lowers the cost of GC. For us we
> make sure to have spare memory on the system for page cache so spilling to
> disk for us is a memory write 99% of the time.  If your host has less free
> memory spilling may become more expensive.
>
>
> If the walk is your bottleneck and not GC then I would recommend JOL and
> guessing to better predict memory.
>
> On Mon, Feb 26, 2018, 4:47 PM Xin Liu <xin.e@gmail.com> wrote:
>
>> Hi folks,
>>
>> We have a situation where, shuffled data is protobuf based, and
>> SizeEstimator is taking a lot of time.
>>
>> We have tried to override SizeEstimator to return a constant value, which
>> speeds up things a lot.
>>
>> My questions, what is the side effect of disabling SizeEstimator? Is it
>> just spark do memory reallocation, or there is more severe consequences?
>>
>> Thanks!
>>
>


Re: SizeEstimator

2018-02-26 Thread Xin Liu
Thanks!

Our protobuf object is fairly complex. Even O(N) takes a lot of time.

On Mon, Feb 26, 2018 at 6:33 PM, 叶先进 <advance...@gmail.com> wrote:

> H Xin Liu,
>
> Could you provide a concrete user case if possible(code to reproduce
> protobuf object and comparisons between  protobuf and normal object)?
>
> I contributed a bit to SizeEstimator years ago, and to my understanding,
> the time complexity should be O(N) where N is the num of referenced fields
> recursively.
>
> We should definitely investigate this case if it indeed takes a lot of
> time on protobuf objects.
>
>
> On 27 Feb 2018, at 8:47 AM, Xin Liu <xin.e@gmail.com> wrote:
>
> Hi folks,
>
> We have a situation where, shuffled data is protobuf based, and
> SizeEstimator is taking a lot of time.
>
> We have tried to override SizeEstimator to return a constant value, which
> speeds up things a lot.
>
> My questions, what is the side effect of disabling SizeEstimator? Is it
> just spark do memory reallocation, or there is more severe consequences?
>
> Thanks!
>
>
>


Re: SizeEstimator

2018-02-26 Thread 叶先进
H Xin Liu, 

Could you provide a concrete user case if possible(code to reproduce protobuf 
object and comparisons between  protobuf and normal object)?

I contributed a bit to SizeEstimator years ago, and to my understanding, the 
time complexity should be O(N) where N is the num of referenced fields 
recursively.

We should definitely investigate this case if it indeed takes a lot of time on 
protobuf objects.

> On 27 Feb 2018, at 8:47 AM, Xin Liu <xin.e@gmail.com> wrote:
> 
> Hi folks,
> 
> We have a situation where, shuffled data is protobuf based, and SizeEstimator 
> is taking a lot of time.
> 
> We have tried to override SizeEstimator to return a constant value, which 
> speeds up things a lot.
> 
> My questions, what is the side effect of disabling SizeEstimator? Is it just 
> spark do memory reallocation, or there is more severe consequences?
> 
> Thanks!



Re: SizeEstimator

2018-02-26 Thread David Capwell
This is used to predict the current cost of memory so spark knows to flush
or not. This is very costly for us so we use a flag marked in the code as
private to lower the cost

spark.shuffle.spill.numElementsForceSpillThreshold (on phone hope no typo)
- how many records before flush

This lowers the cost because it let's us leave data in young, if we don't
bound we get everyone promoted to old and GC becomes a issue.  This doesn't
solve the fact that the walk is slow, but lowers the cost of GC. For us we
make sure to have spare memory on the system for page cache so spilling to
disk for us is a memory write 99% of the time.  If your host has less free
memory spilling may become more expensive.


If the walk is your bottleneck and not GC then I would recommend JOL and
guessing to better predict memory.

On Mon, Feb 26, 2018, 4:47 PM Xin Liu <xin.e@gmail.com> wrote:

> Hi folks,
>
> We have a situation where, shuffled data is protobuf based, and
> SizeEstimator is taking a lot of time.
>
> We have tried to override SizeEstimator to return a constant value, which
> speeds up things a lot.
>
> My questions, what is the side effect of disabling SizeEstimator? Is it
> just spark do memory reallocation, or there is more severe consequences?
>
> Thanks!
>


SizeEstimator

2018-02-26 Thread Xin Liu
Hi folks,

We have a situation where, shuffled data is protobuf based, and
SizeEstimator is taking a lot of time.

We have tried to override SizeEstimator to return a constant value, which
speeds up things a lot.

My questions, what is the side effect of disabling SizeEstimator? Is it
just spark do memory reallocation, or there is more severe consequences?

Thanks!


SizeEstimator for python

2016-08-15 Thread Maurin Lenglart
Hi,
Is there a way to estimate the size of a dataframe in python?
Something similar to 
https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/util/SizeEstimator.html
 ?

thanks


Re: OOM in SizeEstimator while using combineByKey

2015-04-15 Thread Xianjin YE
what is your JVM heap size settings?  The OOM in SIzeEstimator is caused by a 
lot of entry in IdentifyHashMap. 
A quick guess is that the object in your dataset is a custom class and you 
didn't implement the hashCode and equals method correctly. 



On Wednesday, April 15, 2015 at 3:10 PM, Aniket Bhatnagar wrote:

 I am aggregating a dataset using combineByKey method and for a certain input 
 size, the job fails with the following error. I have enabled head dumps to 
 better analyze the issue and will report back if I have any findings. 
 Meanwhile, if you guys have any idea of what could possibly result in this 
 error or how to better debug this, please let me know.
 
 java.lang.OutOfMemoryError: Java heap space
 at java.util.IdentityHashMap.resize(IdentityHashMap.java:469)
 at java.util.IdentityHashMap.put(IdentityHashMap.java:445)
 at 
 org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:132)
 at 
 org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:178)
 at 
 org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:177)
 at scala.collection.immutable.List.foreach(List.scala:381)
 at 
 org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:177)
 at 
 org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:161)
 at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:155)
 at 
 org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
 at 
 org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
 at 
 org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:33)
 at 
 org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)
 at 
 org.apache.spark.util.collection.ExternalAppendOnlyMap.insert(ExternalAppendOnlyMap.scala:105)
 at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:93)
 at 
 org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:44)
 at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



OOM in SizeEstimator while using combineByKey

2015-04-15 Thread Aniket Bhatnagar
I am aggregating a dataset using combineByKey method and for a certain
input size, the job fails with the following error. I have enabled head
dumps to better analyze the issue and will report back if I have any
findings. Meanwhile, if you guys have any idea of what could possibly
result in this error or how to better debug this, please let me know.

java.lang.OutOfMemoryError: Java heap space
at java.util.IdentityHashMap.resize(IdentityHashMap.java:469)
at java.util.IdentityHashMap.put(IdentityHashMap.java:445)
at
org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:132)
at
org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:178)
at
org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:177)
at scala.collection.immutable.List.foreach(List.scala:381)
at
org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:177)
at
org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:161)
at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:155)
at
org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
at
org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
at
org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:33)
at
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)
at
org.apache.spark.util.collection.ExternalAppendOnlyMap.insert(ExternalAppendOnlyMap.scala:105)
at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:93)
at
org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:44)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)


Re: OOM in SizeEstimator while using combineByKey

2015-04-15 Thread Aniket Bhatnagar
I am setting spark.executor.memory as 1024m on a 3 node cluster with each
node having 4 cores and 7 GB RAM. The combiner functions are taking scala
case classes as input and are generating mutable.ListBuffer of scala case
classes. Therefore, I am guessing hashCode and equals should be taken care
of.

Thanks,
Aniket

On Wed, Apr 15, 2015 at 1:00 PM Xianjin YE advance...@gmail.com wrote:

 what is your JVM heap size settings?  The OOM in SIzeEstimator is caused
 by a lot of entry in IdentifyHashMap.
 A quick guess is that the object in your dataset is a custom class and you
 didn't implement the hashCode and equals method correctly.



 On Wednesday, April 15, 2015 at 3:10 PM, Aniket Bhatnagar wrote:

  I am aggregating a dataset using combineByKey method and for a certain
 input size, the job fails with the following error. I have enabled head
 dumps to better analyze the issue and will report back if I have any
 findings. Meanwhile, if you guys have any idea of what could possibly
 result in this error or how to better debug this, please let me know.
 
  java.lang.OutOfMemoryError: Java heap space
  at java.util.IdentityHashMap.resize(IdentityHashMap.java:469)
  at java.util.IdentityHashMap.put(IdentityHashMap.java:445)
  at
 org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:132)
  at
 org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:178)
  at
 org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:177)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at
 org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:177)
  at
 org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:161)
  at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:155)
  at
 org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
  at
 org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
  at
 org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:33)
  at
 org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)
  at
 org.apache.spark.util.collection.ExternalAppendOnlyMap.insert(ExternalAppendOnlyMap.scala:105)
  at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:93)
  at
 org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:44)
  at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
  at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
  at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
  at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
  at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
  at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)






SizeEstimator in Spark 1.1 and high load/object allocation when reading in data

2014-10-30 Thread Erik Freed
Hi All,

We have recently moved to Spark 1.1 from 0.9 for an application handling a
fair number of very large datasets partitioned across multiple nodes. About
half of each of these large datasets is stored in off heap byte arrays and
about half in the standard Java heap.

While these datasets are being loaded from our custom HDFS 2.3 RDD and
before we are using even a fraction of the available Java Heap and the
native off heap memory the loading slows to an absolute crawl. It appears
clear from our profiling of the Spark Executor that in the Spark
SizeEstimator an extremely high cpu load is being demanded along with a
fast and furious allocation of Object[] instances.  We do not believe we
were seeing this sort of behavior in 0.9 and we have noticed rather
significant changes in this part of the BlockManager code going from 0.9 to
1.1 and beyond. A GC run gets rid of all of the Object[] instances.

Before we start spending large amounts of time either switching back to 0.9
or further tracing to the root cause of this, I was wondering if anyone out
there had enough experience with that part of the code (or had run into the
same problem) and could help us understand what sort of root causes might
lay behind this strange behavior and even better what we could do to
resolve them.

Any help would be very much appreciated.

cheers,
Erik