RE: Shuffle intermidiate results not being cached

2016-12-28 Thread Liang-Chi Hsieh
ion summary in next iteration. It >> is more similar to streaming case, I don't think you can/should recompute >> all the data since the beginning of a stream. >> >> assaf.mendelson wrote >> The reason I thought some operations would be reused is the fact that >> spar

RE: Shuffle intermidiate results not being cached

2016-12-28 Thread Liang-Chi Hsieh
does the window internally. This was still 8 times lower > throughput than batch and required a lot of coding and is not a general > solution. > > I am looking for an approach to improve the performance even more > (preferably to either be on par with batch or a relatively low factor >

RE: Shuffle intermidiate results not being cached

2016-12-27 Thread assaf.mendelson
problem is that any attempt to do a streaming like this results in performance which is hundreds of times slower than batch. Is there a correct way to do such an aggregation on streaming data (using dataframes rather than RDD operations). Assaf. From: Liang-Chi Hsieh [via Apa

RE: Shuffle intermidiate results not being cached

2016-12-27 Thread Liang-Chi Hsieh
ch. > Is there a correct way to do such an aggregation on streaming data (using > dataframes rather than RDD operations). > Assaf. > > > > From: Liang-Chi Hsieh [via Apache Spark Developers List] [mailto: > ml-node+s1001551n20361h80@.nabble > ] > Sent: Monday,

RE: Shuffle intermidiate results not being cached

2016-12-26 Thread assaf.mendelson
[via Apache Spark Developers List] [mailto:ml-node+s1001551n20361...@n3.nabble.com] Sent: Monday, December 26, 2016 5:42 PM To: Mendelson, Assaf Subject: Re: Shuffle intermidiate results not being cached Hi, Let me quote your example codes: var totalTime: Long = 0 var allDF

Re: Shuffle intermidiate results not being cached

2016-12-26 Thread Liang-Chi Hsieh
Hi, Let me quote your example codes: var totalTime: Long = 0 var allDF: org.apache.spark.sql.DataFrame = null for { x <- dataframes } { val timeLen = time { allDF = if (allDF == null) x else allDF.union(x) val grouped = allDF.groupBy("cat1", "cat2").agg(sum($"valToAdd").alias("v"))

Re: Shuffle intermidiate results not being cached

2016-12-26 Thread Mark Hamstra
Shuffle results are only reused if you are reusing the exact same RDD. If you are working with Dataframes that you have not explicitly cached, then they are going to be producing new RDDs within their physical plan creation and evaluation, so you won't get implicit shuffle reuse. This is what