var totalTime: Long = 0
  var allDF: DataFrame = null
  for {
    x <- dataframes
  } {
    val timeLen = time {
      allDF = if (allDF == null) x else {
        allDF.union(x).groupBy("cat1", "cat2").agg(sum($"v").alias("v"))
      }
    }
    println(s"Took $timeLen miliseconds")
    totalTime += timeLen
  }
  val timeLen2 = time {
    val grouped2 = allDF.groupBy("cat1").agg(sum($"v"), count($"cat2"))
    grouped2.show()
  }
  totalTime += timeLen2
  println(s"Overall time was $totalTime miliseconds")
} 



Liang-Chi Hsieh wrote
> The shuffle data can be reused only if you use the same RDD. When you
> union x1's RDD and x2's RDD in first iteration, and union x1's RDD and
> x2's RDD and x3's RDD in 2nd iteration, how do you think they are the same
> RDD?
> 
> I just use the previous example code to show that you should not recompute
> all data since the beginning of stream. Usually, a streaming job computes
> a summary of collected data in a window. If you want to compute all the
> data, you should use batch instead of streaming. In other words, if you
> run a long-running streaming job, would you like to recompute all data
> every few seconds after one year?
> 
> BTW, you don't need to compute:
> 
>       val grouped2 = allDF.groupBy("cat1").agg(sum($"v"), count($"cat2"))
> 
> for each iteration. This should be run after the loop if you want to
> compare it with batch. And you don't need to run two aggregation in the
> loop for allDF.
> 
>   var totalTime: Long = 0
>   var allDF: DataFrame = null
>   for {
>     x <- dataframes
>   } {
>     val timeLen = time {
>       allDF = if (allDF == null) x else {
>         allDF.union(x).groupBy("cat1", "cat2").agg(sum($"v").alias("v"))
>       }
>     }
>     val timeLen2 = time {
>       val grouped2 = allDF.groupBy("cat1").agg(sum($"v"), count($"cat2"))
>       grouped2.show()
>     }
>     totalTime += timeLen + timeLen2
>     println(s"Took $timeLen miliseconds")
>   }
>   println(s"Overall time was $totalTime miliseconds")
> }
> 
> Of course, this may not work for all aggregations. I just show that you do
> redundant work in this version when comparing to your batch code. For
> other aggregations, you may need other design to do similar job.
> 
> assaf.mendelson wrote
>> I understand the actual dataframe is different, but the underlying
>> partitions are not (hence the importance of mark's response). The code
>> you suggested would not work as allDF and x would have different schema's
>> (x is the original and allDF becomes the grouped).
>> I can do something like this:
>>   var totalTime: Long = 0
>>   var allDF: DataFrame = null
>>   for {
>>     x <- dataframes
>>   } {
>>     val timeLen = time {
>>       val grouped = x.groupBy("cat1",
>> "cat2").agg(sum($"valToAdd").alias("v"))
>>       allDF = if (allDF == null) grouped else {
>>         allDF.union(grouped).groupBy("cat1",
>> "cat2").agg(sum($"v").alias("v"))
>>       }
>>       val grouped2 = allDF.groupBy("cat1").agg(sum($"v"), count($"cat2"))
>>       grouped2.show()
>>     }
>>     totalTime += timeLen
>>     println(s"Took $timeLen miliseconds")
>>   }
>>   println(s"Overall time was $totalTime miliseconds")
>> }
>> 
>> and this indeed improves performance (I actually had a couple more tries)
>> but:
>> 
>> 1.       This still gives crappy performance (for 167 slices I get a
>> throughput which is 10 times lower than batch after doing some tuning
>> including caching and coalescing)
>> 
>> 2.       This works because the aggregation here is sum and we don't
>> forget. For more general aggregations we would have to join them together
>> (can't do it for count distinct for example) and we will need to "forget"
>> frames when moving out of the window (we can subtract a sum but not a
>> max).
>> 
>> The best solution I found so far (performance wise) was to write a custom
>> UDAF which does the window internally. This was still 8 times lower
>> throughput than batch and required a lot of coding and is not a general
>> solution.
>> 
>> I am looking for an approach to improve the performance even more
>> (preferably to either be on par with batch or a relatively low factor
>> which remains constant when the number of slices rise) and including the
>> option to "forget" frames.
>> 
>> Assaf.
>> 
>> 
>> 
>> 
>> From: Liang-Chi Hsieh [via Apache Spark Developers List] [mailto:

>> ml-node+s1001551n20371h90@.nabble

>> ]
>> Sent: Wednesday, December 28, 2016 3:59 AM
>> To: Mendelson, Assaf
>> Subject: RE: Shuffle intermidiate results not being cached
>> 
>> 
>> Hi,
>> 
>> Every iteration the data you run aggregation on it is different. As I
>> showed in previous reply:
>> 
>> 1st iteration: aggregation(x1 union x2)
>> 2nd iteration: aggregation(x3 union (x1 union x2))
>> 3rd iteration: aggregation(x4 union(x3 union (x1 union x2)))
>> 
>> In 1st you run aggregation on the data of x1 and x2. In 2nd the data is
>> x1, x2 and x3. Even you work on the same RDD, you won't see reuse of the
>> shuffle data because the shuffle data is different.
>> 
>> In your second example, I think the way to reduce the computation is
>> like:
>> 
>> var totalTime: Long = 0
>> var allDF: org.apache.spark.sql.DataFrame = null
>> for {
>>   x <- dataframes
>> } {
>>   val timeLen = time {
>>     allDF = if (allDF == null) x else allDF.union(x) // Union previous
>> aggregation summary with new dataframe in this window
>>     val grouped = allDF.groupBy("cat1",
>> "cat2").agg(sum($"valToAdd").alias("v"))
>>     val grouped2 = grouped.groupBy("cat1").agg(sum($"v"), count($"cat2"))
>>     grouped2.show()
>>     allDF = grouped  // Replace the union of data with aggregated summary
>>   }
>>   totalTime += timeLen
>>   println(s"Took $timeLen miliseconds")
>> }
>> println(s"Total time was $totalTime miliseconds")
>> 
>> You don't need to recompute the aggregation of previous dataframes in
>> each iteration. You just need to get the summary and union it with new
>> dataframe to compute the newer aggregation summary in next iteration. It
>> is more similar to streaming case, I don't think you can/should recompute
>> all the data since the beginning of a stream.
>> 
>> assaf.mendelson wrote
>> The reason I thought some operations would be reused is the fact that
>> spark automatically caches shuffle data which means the partial
>> aggregation for pervious dataframes would be saved. Unfortunatly, as Mark
>> Hamstra explained this is not the case because this is considered a new
>> RDD and therefore the previous data is lost.
>> 
>> I am still wondering if there is any way to do high performance streaming
>> of SQL. Basically this is not far from what DStream would do assuming we
>> convert a sliding window (e.g. 24 hours every 15 minutes) as we would be
>> doing a foreachRDD which would do the joining behind the scenes.
>> The problem is that any attempt to do a streaming like this results in
>> performance which is hundreds of times slower than batch.
>> Is there a correct way to do such an aggregation on streaming data (using
>> dataframes rather than RDD operations).
>> Assaf.
>> 
>> 
>> 
>> From: Liang-Chi Hsieh [via Apache Spark Developers List] [mailto:[hidden
>> email]&lt;/user/SendEmail.jtp?type=node&amp;node=20371&amp;i=0&gt;]
>> Sent: Monday, December 26, 2016 5:42 PM
>> To: Mendelson, Assaf
>> Subject: Re: Shuffle intermidiate results not being cached
>> 
>> 
>> Hi,
>> 
>> Let me quote your example codes:
>> 
>> var totalTime: Long = 0
>> var allDF: org.apache.spark.sql.DataFrame = null
>> for {
>>   x <- dataframes
>> } {
>>   val timeLen = time {
>>     allDF = if (allDF == null) x else allDF.union(x)
>>     val grouped = allDF.groupBy("cat1",
>> "cat2").agg(sum($"valToAdd").alias("v"))
>>     val grouped2 = grouped.groupBy("cat1").agg(sum($"v"), count($"cat2"))
>>     grouped2.show()
>>   }
>>   totalTime += timeLen
>>   println(s"Took $timeLen miliseconds")
>> }
>> println(s"Total time was $totalTime miliseconds")
>> 
>> 
>> Basically what you do is to union some dataframes for each iteration, and
>> do aggregation on this union data. I don't see any reused operations.
>> 
>> 1st iteration: aggregation(x1 union x2)
>> 2nd iteration: aggregation(x3 union (x1 union x2))
>> 3rd iteration: aggregation(x4 union(x3 union (x1 union x2)))
>> ...
>> 
>> Your first example just does two aggregation operations. But your second
>> example like above does this aggregation operations for each iteration.
>> So the time of second example grows as the iteration increases.
>> 
>> Liang-Chi Hsieh | @viirya
>> Spark Technology Center
>> http://www.spark.tc/
>> 
>> ________________________________
>> If you reply to this email, your message will be added to the discussion
>> below:
>> http://apache-spark-developers-list.1001551.n3.nabble.com/Shuffle-intermidiate-results-not-being-cached-tp20358p20361.html
>> To start a new topic under Apache Spark Developers List, email [hidden
>> email]&lt;/user/SendEmail.jtp?type=node&amp;node=20371&amp;i=1&gt;&lt;mailto:[hidden
>> email]&lt;/user/SendEmail.jtp?type=node&amp;node=20371&amp;i=2&gt;>
>> To unsubscribe from Apache Spark Developers List, click
>> here&lt;http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&amp;node=1&amp;code=YXNzYWYubWVuZGVsc29uQHJzYS5jb218MXwtMTI4OTkxNTg1Mg==&gt;&lt;http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&amp;node=1&amp;code=YXNzYWYubWVuZGVsc29uQHJzYS5jb218MXwtMTI4OTkxNTg1Mg==%3e&gt;.
>> NAML&lt;http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&amp;id=instant_html%21nabble%3Aemail.naml&amp;base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&amp;breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml&gt;&lt;http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&amp;id=instant_html%21nabble%3Aemail.naml&amp;base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&amp;breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml%3e&gt;
>> Liang-Chi Hsieh | @viirya
>> Spark Technology Center
>> http://www.spark.tc/
>> 
>> ________________________________
>> If you reply to this email, your message will be added to the discussion
>> below:
>> http://apache-spark-developers-list.1001551.n3.nabble.com/Shuffle-intermidiate-results-not-being-cached-tp20358p20371.html
>> To start a new topic under Apache Spark Developers List, email 

>> ml-node+s1001551n1h20@.nabble

>> &lt;mailto:

>> ml-node+s1001551n1h20@.nabble

>> &gt;
>> To unsubscribe from Apache Spark Developers List, click
>> here&lt;http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&amp;node=1&amp;code=YXNzYWYubWVuZGVsc29uQHJzYS5jb218MXwtMTI4OTkxNTg1Mg==&gt;.
>> NAML&lt;http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&amp;id=instant_html%21nabble%3Aemail.naml&amp;base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&amp;breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml&gt;





-----
Liang-Chi Hsieh | @viirya 
Spark Technology Center 
http://www.spark.tc/ 
--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Shuffle-intermidiate-results-not-being-cached-tp20358p20386.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org

Reply via email to