Thanks for the reply and sorry for my delayed response, had to go find the profile data to lookup the class again.
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala That class extends SizeEstimator and has a field "map" which buffers the rows. In my case the buffer was > 1 million rows so became costly every time it was checked. This can be reproduced, create a random data set of (string, long), then group by string (I believe this is what the code did first, there was a sort later but should have been a different stage). Make sure number of executors is small (for example only one) else you are reducing the size of M for each executor. On Mon, Feb 26, 2018, 10:04 PM 叶先进 <advance...@gmail.com> wrote: > What type is for the buffer you mentioned? > > > On 27 Feb 2018, at 11:46 AM, David Capwell <dcapw...@gmail.com> wrote: > > advancedxy <advance...@gmail.com>, I don't remember the code as well > anymore but what we hit was a very simple schema (string, long). The issue > is the buffer had a million of these so SizeEstimator of the buffer had to > keep recalculating the same elements over and over again. SizeEstimator > was on-cpu about 30% of the time, bounding the buffer got it to be < 5% > (going off memory so may be off). > > The class info(size of fields lay on heap) is cached for every occurred > class, so the size info of the same elements would not be recalculated. > However, for Collection class (or similar) SizeEstimator will scan all the > elements in the container (`next` field in LinkedList for example). > > And the array is a special case: SizeEstimator will sample array if > array.length > ARRAY_SIZE_FOR_SAMPLING(400). > > The cost is really (assuming memory is O(1) which is not true) O(N × M) > where N is number of rows in buffer and M is size of schema. My case could > be solved by not recomputing which would bring the cost to O(M) since > bookkeeping should be consistent time. There was logic to delay > recalculating bases off a change in frequency, but that didn't really do > much for us, bounding and spilling was the bigger win in our case. > > On Mon, Feb 26, 2018, 7:24 PM Xin Liu <xin.e....@gmail.com> wrote: > >> Thanks David. Another solution is to convert the protobuf object to byte >> array, It does speed up SizeEstimator >> >> On Mon, Feb 26, 2018 at 5:34 PM, David Capwell <dcapw...@gmail.com> >> wrote: >> >>> This is used to predict the current cost of memory so spark knows to >>> flush or not. This is very costly for us so we use a flag marked in the >>> code as private to lower the cost >>> >>> spark.shuffle.spill.numElementsForceSpillThreshold (on phone hope no >>> typo) - how many records before flush >>> >>> This lowers the cost because it let's us leave data in young, if we >>> don't bound we get everyone promoted to old and GC becomes a issue. This >>> doesn't solve the fact that the walk is slow, but lowers the cost of GC. >>> For us we make sure to have spare memory on the system for page cache so >>> spilling to disk for us is a memory write 99% of the time. If your host >>> has less free memory spilling may become more expensive. >>> >>> >>> If the walk is your bottleneck and not GC then I would recommend JOL and >>> guessing to better predict memory. >>> >>> On Mon, Feb 26, 2018, 4:47 PM Xin Liu <xin.e....@gmail.com> wrote: >>> >>>> Hi folks, >>>> >>>> We have a situation where, shuffled data is protobuf based, and >>>> SizeEstimator is taking a lot of time. >>>> >>>> We have tried to override SizeEstimator to return a constant value, >>>> which speeds up things a lot. >>>> >>>> My questions, what is the side effect of disabling SizeEstimator? Is it >>>> just spark do memory reallocation, or there is more severe consequences? >>>> >>>> Thanks! >>>> >>> >> >