What's the plan if you run explain? In 1.5 the default should be TungstenAggregate, which does spill (switching from hash-based aggregation to sort-based aggregation).
On Mon, Sep 21, 2015 at 5:34 PM, Matt Cheah <mch...@palantir.com> wrote: > Hi everyone, > > I’m debugging some slowness and apparent memory pressure + GC issues after > I ported some workflows from raw RDDs to Data Frames. In particular, I’m > looking into an aggregation workflow that computes many aggregations per > key at once. > > My workflow before was doing a fairly straightforward combineByKey call > where the aggregation would build up non-trivially sized objects in memory > – I was computing numerous sums over various fields of the data at a time. > In particular, I noticed that there was spilling to disk on the map side of > the aggregation. > > When I switched to using DataFrames aggregation – particularly > DataFrame.groupBy(some list of keys).agg(exprs) where I passed a large > number of “Sum” exprs - the execution began to choke. I saw one of my > executors had a long GC pause and my job isn’t able to recover. However > when I reduced the number of Sum expressions being computed in the > aggregation, the workflow started to work normally. > > I have a hypothesis that I would like to run by everyone. In > org.apache.spark.sql.execution.Aggregate.scala, branch-1.5, I’m looking at > the execution of Aggregation > <https://github.com/apache/spark/blob/branch-1.5/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala>, > which appears to build the aggregation result in memory via updating a > HashMap and iterating over the rows. However this appears to be less robust > than what would happen if PairRDDFunctions.combineByKey were to be used. If > combineByKey were used, then instead of using two mapPartitions calls > (assuming the aggregation is partially-computable, as Sum is), it would use > the ExternalSorter and ExternalAppendOnlyMap objects to compute the > aggregation. This would allow the aggregated result to grow large as some > of the aggregated result could be spilled to disk. This especially seems > bad if the aggregation reduction factor is low; that is, if there are many > unique keys in the dataset. In particular, the Hash Map is holding O(# of > keys * number of aggregated results per key) items in memory at a time. > > I was wondering what everyone’s thought on this problem is. Did we > consciously think about the robustness implications when choosing to use an > in memory Hash Map to compute the aggregation? Is this an inherent > limitation of the aggregation implementation in Data Frames? > > Thanks, > > -Matt Cheah > > > > > >