I was executing on Spark 1.4 so I didn¹t notice the Tungsten option would make spilling happen in 1.5. I¹ll upgrade to 1.5 and see how that turns out. Thanks!
From: Reynold Xin <r...@databricks.com> Date: Monday, September 21, 2015 at 5:36 PM To: Matt Cheah <mch...@palantir.com> Cc: "dev@spark.apache.org" <dev@spark.apache.org>, Mingyu Kim <m...@palantir.com>, Peter Faiman <peterfai...@palantir.com> Subject: Re: DataFrames Aggregate does not spill? What's the plan if you run explain? In 1.5 the default should be TungstenAggregate, which does spill (switching from hash-based aggregation to sort-based aggregation). On Mon, Sep 21, 2015 at 5:34 PM, Matt Cheah <mch...@palantir.com> wrote: > Hi everyone, > > I¹m debugging some slowness and apparent memory pressure + GC issues after I > ported some workflows from raw RDDs to Data Frames. In particular, I¹m looking > into an aggregation workflow that computes many aggregations per key at once. > > My workflow before was doing a fairly straightforward combineByKey call where > the aggregation would build up non-trivially sized objects in memory I was > computing numerous sums over various fields of the data at a time. In > particular, I noticed that there was spilling to disk on the map side of the > aggregation. > > When I switched to using DataFrames aggregation particularly > DataFrame.groupBy(some list of keys).agg(exprs) where I passed a large number > of ³Sum² exprs - the execution began to choke. I saw one of my executors had a > long GC pause and my job isn¹t able to recover. However when I reduced the > number of Sum expressions being computed in the aggregation, the workflow > started to work normally. > > I have a hypothesis that I would like to run by everyone. In > org.apache.spark.sql.execution.Aggregate.scala, branch-1.5, I¹m looking at the > execution of Aggregation > <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_spark_ > blob_branch-2D1.5_sql_core_src_main_scala_org_apache_spark_sql_execution_Aggre > gate.scala&d=BQMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=hzwIMNQ9E9 > 9EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs&m=MY0QvbkaVGKP6m7L6daL19eak5Q_ByWt_84mRZfff8 > k&s=2f8iTPkA6bsre40-juWK2Q5xA-v_5y6f3ucP4cKKa1s&e=> , which appears to build > the aggregation result in memory via updating a HashMap and iterating over the > rows. However this appears to be less robust than what would happen if > PairRDDFunctions.combineByKey were to be used. If combineByKey were used, then > instead of using two mapPartitions calls (assuming the aggregation is > partially-computable, as Sum is), it would use the ExternalSorter and > ExternalAppendOnlyMap objects to compute the aggregation. This would allow the > aggregated result to grow large as some of the aggregated result could be > spilled to disk. This especially seems bad if the aggregation reduction factor > is low; that is, if there are many unique keys in the dataset. In particular, > the Hash Map is holding O(# of keys * number of aggregated results per key) > items in memory at a time. > > I was wondering what everyone¹s thought on this problem is. Did we consciously > think about the robustness implications when choosing to use an in memory Hash > Map to compute the aggregation? Is this an inherent limitation of the > aggregation implementation in Data Frames? > > Thanks, > > -Matt Cheah > > > > >
smime.p7s
Description: S/MIME cryptographic signature