Hi everyone, I¹m debugging some slowness and apparent memory pressure + GC issues after I ported some workflows from raw RDDs to Data Frames. In particular, I¹m looking into an aggregation workflow that computes many aggregations per key at once.
My workflow before was doing a fairly straightforward combineByKey call where the aggregation would build up non-trivially sized objects in memory I was computing numerous sums over various fields of the data at a time. In particular, I noticed that there was spilling to disk on the map side of the aggregation. When I switched to using DataFrames aggregation particularly DataFrame.groupBy(some list of keys).agg(exprs) where I passed a large number of ³Sum² exprs - the execution began to choke. I saw one of my executors had a long GC pause and my job isn¹t able to recover. However when I reduced the number of Sum expressions being computed in the aggregation, the workflow started to work normally. I have a hypothesis that I would like to run by everyone. In org.apache.spark.sql.execution.Aggregate.scala, branch-1.5, I¹m looking at the execution of Aggregation <https://github.com/apache/spark/blob/branch-1.5/sql/core/src/main/scala/org /apache/spark/sql/execution/Aggregate.scala> , which appears to build the aggregation result in memory via updating a HashMap and iterating over the rows. However this appears to be less robust than what would happen if PairRDDFunctions.combineByKey were to be used. If combineByKey were used, then instead of using two mapPartitions calls (assuming the aggregation is partially-computable, as Sum is), it would use the ExternalSorter and ExternalAppendOnlyMap objects to compute the aggregation. This would allow the aggregated result to grow large as some of the aggregated result could be spilled to disk. This especially seems bad if the aggregation reduction factor is low; that is, if there are many unique keys in the dataset. In particular, the Hash Map is holding O(# of keys * number of aggregated results per key) items in memory at a time. I was wondering what everyone¹s thought on this problem is. Did we consciously think about the robustness implications when choosing to use an in memory Hash Map to compute the aggregation? Is this an inherent limitation of the aggregation implementation in Data Frames? Thanks, -Matt Cheah
smime.p7s
Description: S/MIME cryptographic signature