Hi everyone,

I¹m debugging some slowness and apparent memory pressure + GC issues after I
ported some workflows from raw RDDs to Data Frames. In particular, I¹m
looking into an aggregation workflow that computes many aggregations per key
at once.

My workflow before was doing a fairly straightforward combineByKey call
where the aggregation would build up non-trivially sized objects in memory ­
I was computing numerous sums over various fields of the data at a time. In
particular, I noticed that there was spilling to disk on the map side of the
aggregation.

When I switched to using DataFrames aggregation ­ particularly
DataFrame.groupBy(some list of keys).agg(exprs) where I passed a large
number of ³Sum² exprs - the execution began to choke. I saw one of my
executors had a long GC pause and my job isn¹t able to recover. However when
I reduced the number of Sum expressions being computed in the aggregation,
the workflow started to work normally.

I have a hypothesis that I would like to run by everyone. In
org.apache.spark.sql.execution.Aggregate.scala, branch-1.5, I¹m looking at
the execution of Aggregation
<https://github.com/apache/spark/blob/branch-1.5/sql/core/src/main/scala/org
/apache/spark/sql/execution/Aggregate.scala> , which appears to build the
aggregation result in memory via updating a HashMap and iterating over the
rows. However this appears to be less robust than what would happen if
PairRDDFunctions.combineByKey were to be used. If combineByKey were used,
then instead of using two mapPartitions calls (assuming the aggregation is
partially-computable, as Sum is), it would use the ExternalSorter and
ExternalAppendOnlyMap objects to compute the aggregation. This would allow
the aggregated result to grow large as some of the aggregated result could
be spilled to disk. This especially seems bad if the aggregation reduction
factor is low; that is, if there are many unique keys in the dataset. In
particular, the Hash Map is holding O(# of keys * number of aggregated
results per key) items in memory at a time.

I was wondering what everyone¹s thought on this problem is. Did we
consciously think about the robustness implications when choosing to use an
in memory Hash Map to compute the aggregation? Is this an inherent
limitation of the aggregation implementation in Data Frames?

Thanks,

-Matt Cheah







Attachment: smime.p7s
Description: S/MIME cryptographic signature

Reply via email to