var totalTime: Long = 0
var allDF: DataFrame = null
for {
x <- dataframes
} {
val timeLen = time {
allDF = if (allDF == null) x else {
allDF.union(x).groupBy("cat1", "cat2").agg(sum($"v").alias("v"))
}
}
println(s"Took $timeLen miliseconds")
totalTime += timeLen
}
val timeLen2 = time {
val grouped2 = allDF.groupBy("cat1").agg(sum($"v"), count($"cat2"))
grouped2.show()
}
totalTime += timeLen2
println(s"Overall time was $totalTime miliseconds")
}
Liang-Chi Hsieh wrote
> The shuffle data can be reused only if you use the same RDD. When you
> union x1's RDD and x2's RDD in first iteration, and union x1's RDD and
> x2's RDD and x3's RDD in 2nd iteration, how do you think they are the same
> RDD?
>
> I just use the previous example code to show that you should not recompute
> all data since the beginning of stream. Usually, a streaming job computes
> a summary of collected data in a window. If you want to compute all the
> data, you should use batch instead of streaming. In other words, if you
> run a long-running streaming job, would you like to recompute all data
> every few seconds after one year?
>
> BTW, you don't need to compute:
>
> val grouped2 = allDF.groupBy("cat1").agg(sum($"v"), count($"cat2"))
>
> for each iteration. This should be run after the loop if you want to
> compare it with batch. And you don't need to run two aggregation in the
> loop for allDF.
>
> var totalTime: Long = 0
> var allDF: DataFrame = null
> for {
> x <- dataframes
> } {
> val timeLen = time {
> allDF = if (allDF == null) x else {
> allDF.union(x).groupBy("cat1", "cat2").agg(sum($"v").alias("v"))
> }
> }
> val timeLen2 = time {
> val grouped2 = allDF.groupBy("cat1").agg(sum($"v"), count($"cat2"))
> grouped2.show()
> }
> totalTime += timeLen + timeLen2
> println(s"Took $timeLen miliseconds")
> }
> println(s"Overall time was $totalTime miliseconds")
> }
>
> Of course, this may not work for all aggregations. I just show that you do
> redundant work in this version when comparing to your batch code. For
> other aggregations, you may need other design to do similar job.
>
> assaf.mendelson wrote
>> I understand the actual dataframe is different, but the underlying
>> partitions are not (hence the importance of mark's response). The code
>> you suggested would not work as allDF and x would have different schema's
>> (x is the original and allDF becomes the grouped).
>> I can do something like this:
>> var totalTime: Long = 0
>> var allDF: DataFrame = null
>> for {
>> x <- dataframes
>> } {
>> val timeLen = time {
>> val grouped = x.groupBy("cat1",
>> "cat2").agg(sum($"valToAdd").alias("v"))
>> allDF = if (allDF == null) grouped else {
>> allDF.union(grouped).groupBy("cat1",
>> "cat2").agg(sum($"v").alias("v"))
>> }
>> val grouped2 = allDF.groupBy("cat1").agg(sum($"v"), count($"cat2"))
>> grouped2.show()
>> }
>> totalTime += timeLen
>> println(s"Took $timeLen miliseconds")
>> }
>> println(s"Overall time was $totalTime miliseconds")
>> }
>>
>> and this indeed improves performance (I actually had a couple more tries)
>> but:
>>
>> 1. This still gives crappy performance (for 167 slices I get a
>> throughput which is 10 times lower than batch after doing some tuning
>> including caching and coalescing)
>>
>> 2. This works because the aggregation here is sum and we don't
>> forget. For more general aggregations we would have to join them together
>> (can't do it for count distinct for example) and we will need to "forget"
>> frames when moving out of the window (we can subtract a sum but not a
>> max).
>>
>> The best solution I found so far (performance wise) was to write a custom
>> UDAF which does the window internally. This was still 8 times lower
>> throughput than batch and required a lot of coding and is not a general
>> solution.
>>
>> I am looking for an approach to improve the performance even more
>> (preferably to either be on par with batch or a relatively low factor
>> which remains constant when the number of slices rise) and including the
>> option to "forget" frames.
>>
>> Assaf.
>>
>>
>>
>>
>> From: Liang-Chi Hsieh [via Apache Spark Developers List] [mailto:
>> [email protected]
>> ]
>> Sent: Wednesday, December 28, 2016 3:59 AM
>> To: Mendelson, Assaf
>> Subject: RE: Shuffle intermidiate results not being cached
>>
>>
>> Hi,
>>
>> Every iteration the data you run aggregation on it is different. As I
>> showed in previous reply:
>>
>> 1st iteration: aggregation(x1 union x2)
>> 2nd iteration: aggregation(x3 union (x1 union x2))
>> 3rd iteration: aggregation(x4 union(x3 union (x1 union x2)))
>>
>> In 1st you run aggregation on the data of x1 and x2. In 2nd the data is
>> x1, x2 and x3. Even you work on the same RDD, you won't see reuse of the
>> shuffle data because the shuffle data is different.
>>
>> In your second example, I think the way to reduce the computation is
>> like:
>>
>> var totalTime: Long = 0
>> var allDF: org.apache.spark.sql.DataFrame = null
>> for {
>> x <- dataframes
>> } {
>> val timeLen = time {
>> allDF = if (allDF == null) x else allDF.union(x) // Union previous
>> aggregation summary with new dataframe in this window
>> val grouped = allDF.groupBy("cat1",
>> "cat2").agg(sum($"valToAdd").alias("v"))
>> val grouped2 = grouped.groupBy("cat1").agg(sum($"v"), count($"cat2"))
>> grouped2.show()
>> allDF = grouped // Replace the union of data with aggregated summary
>> }
>> totalTime += timeLen
>> println(s"Took $timeLen miliseconds")
>> }
>> println(s"Total time was $totalTime miliseconds")
>>
>> You don't need to recompute the aggregation of previous dataframes in
>> each iteration. You just need to get the summary and union it with new
>> dataframe to compute the newer aggregation summary in next iteration. It
>> is more similar to streaming case, I don't think you can/should recompute
>> all the data since the beginning of a stream.
>>
>> assaf.mendelson wrote
>> The reason I thought some operations would be reused is the fact that
>> spark automatically caches shuffle data which means the partial
>> aggregation for pervious dataframes would be saved. Unfortunatly, as Mark
>> Hamstra explained this is not the case because this is considered a new
>> RDD and therefore the previous data is lost.
>>
>> I am still wondering if there is any way to do high performance streaming
>> of SQL. Basically this is not far from what DStream would do assuming we
>> convert a sliding window (e.g. 24 hours every 15 minutes) as we would be
>> doing a foreachRDD which would do the joining behind the scenes.
>> The problem is that any attempt to do a streaming like this results in
>> performance which is hundreds of times slower than batch.
>> Is there a correct way to do such an aggregation on streaming data (using
>> dataframes rather than RDD operations).
>> Assaf.
>>
>>
>>
>> From: Liang-Chi Hsieh [via Apache Spark Developers List] [mailto:[hidden
>> email]</user/SendEmail.jtp?type=node&node=20371&i=0>]
>> Sent: Monday, December 26, 2016 5:42 PM
>> To: Mendelson, Assaf
>> Subject: Re: Shuffle intermidiate results not being cached
>>
>>
>> Hi,
>>
>> Let me quote your example codes:
>>
>> var totalTime: Long = 0
>> var allDF: org.apache.spark.sql.DataFrame = null
>> for {
>> x <- dataframes
>> } {
>> val timeLen = time {
>> allDF = if (allDF == null) x else allDF.union(x)
>> val grouped = allDF.groupBy("cat1",
>> "cat2").agg(sum($"valToAdd").alias("v"))
>> val grouped2 = grouped.groupBy("cat1").agg(sum($"v"), count($"cat2"))
>> grouped2.show()
>> }
>> totalTime += timeLen
>> println(s"Took $timeLen miliseconds")
>> }
>> println(s"Total time was $totalTime miliseconds")
>>
>>
>> Basically what you do is to union some dataframes for each iteration, and
>> do aggregation on this union data. I don't see any reused operations.
>>
>> 1st iteration: aggregation(x1 union x2)
>> 2nd iteration: aggregation(x3 union (x1 union x2))
>> 3rd iteration: aggregation(x4 union(x3 union (x1 union x2)))
>> ...
>>
>> Your first example just does two aggregation operations. But your second
>> example like above does this aggregation operations for each iteration.
>> So the time of second example grows as the iteration increases.
>>
>> Liang-Chi Hsieh | @viirya
>> Spark Technology Center
>> http://www.spark.tc/
>>
>> ________________________________
>> If you reply to this email, your message will be added to the discussion
>> below:
>> http://apache-spark-developers-list.1001551.n3.nabble.com/Shuffle-intermidiate-results-not-being-cached-tp20358p20361.html
>> To start a new topic under Apache Spark Developers List, email [hidden
>> email]</user/SendEmail.jtp?type=node&node=20371&i=1><mailto:[hidden
>> email]</user/SendEmail.jtp?type=node&node=20371&i=2>>
>> To unsubscribe from Apache Spark Developers List, click
>> here<http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=YXNzYWYubWVuZGVsc29uQHJzYS5jb218MXwtMTI4OTkxNTg1Mg==><http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=YXNzYWYubWVuZGVsc29uQHJzYS5jb218MXwtMTI4OTkxNTg1Mg==%3e>.
>> NAML<http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml><http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml%3e>
>> Liang-Chi Hsieh | @viirya
>> Spark Technology Center
>> http://www.spark.tc/
>>
>> ________________________________
>> If you reply to this email, your message will be added to the discussion
>> below:
>> http://apache-spark-developers-list.1001551.n3.nabble.com/Shuffle-intermidiate-results-not-being-cached-tp20358p20371.html
>> To start a new topic under Apache Spark Developers List, email
>> [email protected]
>> <mailto:
>> [email protected]
>> >
>> To unsubscribe from Apache Spark Developers List, click
>> here<http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=YXNzYWYubWVuZGVsc29uQHJzYS5jb218MXwtMTI4OTkxNTg1Mg==>.
>> NAML<http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
-----
Liang-Chi Hsieh | @viirya
Spark Technology Center
http://www.spark.tc/
--
View this message in context:
http://apache-spark-developers-list.1001551.n3.nabble.com/Shuffle-intermidiate-results-not-being-cached-tp20358p20386.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe e-mail: [email protected]