var totalTime: Long = 0 var allDF: DataFrame = null for { x <- dataframes } { val timeLen = time { allDF = if (allDF == null) x else { allDF.union(x).groupBy("cat1", "cat2").agg(sum($"v").alias("v")) } } println(s"Took $timeLen miliseconds") totalTime += timeLen } val timeLen2 = time { val grouped2 = allDF.groupBy("cat1").agg(sum($"v"), count($"cat2")) grouped2.show() } totalTime += timeLen2 println(s"Overall time was $totalTime miliseconds") }
Liang-Chi Hsieh wrote > The shuffle data can be reused only if you use the same RDD. When you > union x1's RDD and x2's RDD in first iteration, and union x1's RDD and > x2's RDD and x3's RDD in 2nd iteration, how do you think they are the same > RDD? > > I just use the previous example code to show that you should not recompute > all data since the beginning of stream. Usually, a streaming job computes > a summary of collected data in a window. If you want to compute all the > data, you should use batch instead of streaming. In other words, if you > run a long-running streaming job, would you like to recompute all data > every few seconds after one year? > > BTW, you don't need to compute: > > val grouped2 = allDF.groupBy("cat1").agg(sum($"v"), count($"cat2")) > > for each iteration. This should be run after the loop if you want to > compare it with batch. And you don't need to run two aggregation in the > loop for allDF. > > var totalTime: Long = 0 > var allDF: DataFrame = null > for { > x <- dataframes > } { > val timeLen = time { > allDF = if (allDF == null) x else { > allDF.union(x).groupBy("cat1", "cat2").agg(sum($"v").alias("v")) > } > } > val timeLen2 = time { > val grouped2 = allDF.groupBy("cat1").agg(sum($"v"), count($"cat2")) > grouped2.show() > } > totalTime += timeLen + timeLen2 > println(s"Took $timeLen miliseconds") > } > println(s"Overall time was $totalTime miliseconds") > } > > Of course, this may not work for all aggregations. I just show that you do > redundant work in this version when comparing to your batch code. For > other aggregations, you may need other design to do similar job. > > assaf.mendelson wrote >> I understand the actual dataframe is different, but the underlying >> partitions are not (hence the importance of mark's response). The code >> you suggested would not work as allDF and x would have different schema's >> (x is the original and allDF becomes the grouped). >> I can do something like this: >> var totalTime: Long = 0 >> var allDF: DataFrame = null >> for { >> x <- dataframes >> } { >> val timeLen = time { >> val grouped = x.groupBy("cat1", >> "cat2").agg(sum($"valToAdd").alias("v")) >> allDF = if (allDF == null) grouped else { >> allDF.union(grouped).groupBy("cat1", >> "cat2").agg(sum($"v").alias("v")) >> } >> val grouped2 = allDF.groupBy("cat1").agg(sum($"v"), count($"cat2")) >> grouped2.show() >> } >> totalTime += timeLen >> println(s"Took $timeLen miliseconds") >> } >> println(s"Overall time was $totalTime miliseconds") >> } >> >> and this indeed improves performance (I actually had a couple more tries) >> but: >> >> 1. This still gives crappy performance (for 167 slices I get a >> throughput which is 10 times lower than batch after doing some tuning >> including caching and coalescing) >> >> 2. This works because the aggregation here is sum and we don't >> forget. For more general aggregations we would have to join them together >> (can't do it for count distinct for example) and we will need to "forget" >> frames when moving out of the window (we can subtract a sum but not a >> max). >> >> The best solution I found so far (performance wise) was to write a custom >> UDAF which does the window internally. This was still 8 times lower >> throughput than batch and required a lot of coding and is not a general >> solution. >> >> I am looking for an approach to improve the performance even more >> (preferably to either be on par with batch or a relatively low factor >> which remains constant when the number of slices rise) and including the >> option to "forget" frames. >> >> Assaf. >> >> >> >> >> From: Liang-Chi Hsieh [via Apache Spark Developers List] [mailto: >> ml-node+s1001551n20371h90@.nabble >> ] >> Sent: Wednesday, December 28, 2016 3:59 AM >> To: Mendelson, Assaf >> Subject: RE: Shuffle intermidiate results not being cached >> >> >> Hi, >> >> Every iteration the data you run aggregation on it is different. As I >> showed in previous reply: >> >> 1st iteration: aggregation(x1 union x2) >> 2nd iteration: aggregation(x3 union (x1 union x2)) >> 3rd iteration: aggregation(x4 union(x3 union (x1 union x2))) >> >> In 1st you run aggregation on the data of x1 and x2. In 2nd the data is >> x1, x2 and x3. Even you work on the same RDD, you won't see reuse of the >> shuffle data because the shuffle data is different. >> >> In your second example, I think the way to reduce the computation is >> like: >> >> var totalTime: Long = 0 >> var allDF: org.apache.spark.sql.DataFrame = null >> for { >> x <- dataframes >> } { >> val timeLen = time { >> allDF = if (allDF == null) x else allDF.union(x) // Union previous >> aggregation summary with new dataframe in this window >> val grouped = allDF.groupBy("cat1", >> "cat2").agg(sum($"valToAdd").alias("v")) >> val grouped2 = grouped.groupBy("cat1").agg(sum($"v"), count($"cat2")) >> grouped2.show() >> allDF = grouped // Replace the union of data with aggregated summary >> } >> totalTime += timeLen >> println(s"Took $timeLen miliseconds") >> } >> println(s"Total time was $totalTime miliseconds") >> >> You don't need to recompute the aggregation of previous dataframes in >> each iteration. You just need to get the summary and union it with new >> dataframe to compute the newer aggregation summary in next iteration. It >> is more similar to streaming case, I don't think you can/should recompute >> all the data since the beginning of a stream. >> >> assaf.mendelson wrote >> The reason I thought some operations would be reused is the fact that >> spark automatically caches shuffle data which means the partial >> aggregation for pervious dataframes would be saved. Unfortunatly, as Mark >> Hamstra explained this is not the case because this is considered a new >> RDD and therefore the previous data is lost. >> >> I am still wondering if there is any way to do high performance streaming >> of SQL. Basically this is not far from what DStream would do assuming we >> convert a sliding window (e.g. 24 hours every 15 minutes) as we would be >> doing a foreachRDD which would do the joining behind the scenes. >> The problem is that any attempt to do a streaming like this results in >> performance which is hundreds of times slower than batch. >> Is there a correct way to do such an aggregation on streaming data (using >> dataframes rather than RDD operations). >> Assaf. >> >> >> >> From: Liang-Chi Hsieh [via Apache Spark Developers List] [mailto:[hidden >> email]</user/SendEmail.jtp?type=node&node=20371&i=0>] >> Sent: Monday, December 26, 2016 5:42 PM >> To: Mendelson, Assaf >> Subject: Re: Shuffle intermidiate results not being cached >> >> >> Hi, >> >> Let me quote your example codes: >> >> var totalTime: Long = 0 >> var allDF: org.apache.spark.sql.DataFrame = null >> for { >> x <- dataframes >> } { >> val timeLen = time { >> allDF = if (allDF == null) x else allDF.union(x) >> val grouped = allDF.groupBy("cat1", >> "cat2").agg(sum($"valToAdd").alias("v")) >> val grouped2 = grouped.groupBy("cat1").agg(sum($"v"), count($"cat2")) >> grouped2.show() >> } >> totalTime += timeLen >> println(s"Took $timeLen miliseconds") >> } >> println(s"Total time was $totalTime miliseconds") >> >> >> Basically what you do is to union some dataframes for each iteration, and >> do aggregation on this union data. I don't see any reused operations. >> >> 1st iteration: aggregation(x1 union x2) >> 2nd iteration: aggregation(x3 union (x1 union x2)) >> 3rd iteration: aggregation(x4 union(x3 union (x1 union x2))) >> ... >> >> Your first example just does two aggregation operations. But your second >> example like above does this aggregation operations for each iteration. >> So the time of second example grows as the iteration increases. >> >> Liang-Chi Hsieh | @viirya >> Spark Technology Center >> http://www.spark.tc/ >> >> ________________________________ >> If you reply to this email, your message will be added to the discussion >> below: >> http://apache-spark-developers-list.1001551.n3.nabble.com/Shuffle-intermidiate-results-not-being-cached-tp20358p20361.html >> To start a new topic under Apache Spark Developers List, email [hidden >> email]</user/SendEmail.jtp?type=node&node=20371&i=1><mailto:[hidden >> email]</user/SendEmail.jtp?type=node&node=20371&i=2>> >> To unsubscribe from Apache Spark Developers List, click >> here<http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=YXNzYWYubWVuZGVsc29uQHJzYS5jb218MXwtMTI4OTkxNTg1Mg==><http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=YXNzYWYubWVuZGVsc29uQHJzYS5jb218MXwtMTI4OTkxNTg1Mg==%3e>. >> NAML<http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml><http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml%3e> >> Liang-Chi Hsieh | @viirya >> Spark Technology Center >> http://www.spark.tc/ >> >> ________________________________ >> If you reply to this email, your message will be added to the discussion >> below: >> http://apache-spark-developers-list.1001551.n3.nabble.com/Shuffle-intermidiate-results-not-being-cached-tp20358p20371.html >> To start a new topic under Apache Spark Developers List, email >> ml-node+s1001551n1h20@.nabble >> <mailto: >> ml-node+s1001551n1h20@.nabble >> > >> To unsubscribe from Apache Spark Developers List, click >> here<http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=YXNzYWYubWVuZGVsc29uQHJzYS5jb218MXwtMTI4OTkxNTg1Mg==>. >> NAML<http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> ----- Liang-Chi Hsieh | @viirya Spark Technology Center http://www.spark.tc/ -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Shuffle-intermidiate-results-not-being-cached-tp20358p20386.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe e-mail: dev-unsubscr...@spark.apache.org