Re: *ByKey aggregations: performance + order
I'm interested too and don't know for sure but I do not think this case is optimized this way. However if you know your keys aren't split across partitions and you have small enough partitions you can implement the same grouping with mapPartitions and Scala. On Jan 15, 2015 1:27 AM, Tobias Pfeiffer t...@preferred.jp wrote: Sean, thanks for your message. On Wed, Jan 14, 2015 at 8:36 PM, Sean Owen so...@cloudera.com wrote: On Wed, Jan 14, 2015 at 4:53 AM, Tobias Pfeiffer t...@preferred.jp wrote: OK, it seems like even on a local machine (with no network overhead), the groupByKey version is about 5 times slower than any of the other (reduceByKey, combineByKey etc.) functions... Even without network overhead, you're still paying the cost of setting up the shuffle and serialization. Can I pick an appropriate scheduler some time before so that Spark knows all items with the same key are on the same host? (Or enforce this?) Thanks Tobias
Re: *ByKey aggregations: performance + order
Sean, thanks for your message. On Wed, Jan 14, 2015 at 8:36 PM, Sean Owen so...@cloudera.com wrote: On Wed, Jan 14, 2015 at 4:53 AM, Tobias Pfeiffer t...@preferred.jp wrote: OK, it seems like even on a local machine (with no network overhead), the groupByKey version is about 5 times slower than any of the other (reduceByKey, combineByKey etc.) functions... Even without network overhead, you're still paying the cost of setting up the shuffle and serialization. Can I pick an appropriate scheduler some time before so that Spark knows all items with the same key are on the same host? (Or enforce this?) Thanks Tobias
Re: *ByKey aggregations: performance + order
On Wed, Jan 14, 2015 at 4:53 AM, Tobias Pfeiffer t...@preferred.jp wrote: Now I don't know (yet) if all of the functions I want to compute can be expressed in this way and I was wondering about *how much* more expensive we are talking about. OK, it seems like even on a local machine (with no network overhead), the groupByKey version is about 5 times slower than any of the other (reduceByKey, combineByKey etc.) functions... Even without network overhead, you're still paying the cost of setting up the shuffle and serialization. Yes the overhead is that groupByKey has to move all the data around. If an aggregation is really what's needed, then most of the reducing / combining can happen locally before the result ever goes anywhere else on the network. There's also the issue of keys with very large values. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: *ByKey aggregations: performance + order
Hi, On Wed, Jan 14, 2015 at 12:11 PM, Tobias Pfeiffer t...@preferred.jp wrote: Now I don't know (yet) if all of the functions I want to compute can be expressed in this way and I was wondering about *how much* more expensive we are talking about. OK, it seems like even on a local machine (with no network overhead), the groupByKey version is about 5 times slower than any of the other (reduceByKey, combineByKey etc.) functions... val rdd = sc.parallelize(1 to 500) val withKeys = rdd.zipWithIndex.map(kv = (kv._2/10, kv._1)) withKeys.cache() withKeys.count // around 850-1100 ms for (i - 1 to 5) yield { val start = System.currentTimeMillis withKeys.reduceByKey(_ + _).count() System.currentTimeMillis - start } // around 800-1100 ms for (i - 1 to 5) yield { val start = System.currentTimeMillis withKeys.combineByKey((x: Int) = x, (x: Int, y: Int) = x + y, (x: Int, y: Int) = x + y).count() System.currentTimeMillis - start } // around 1500-1900 ms for (i - 1 to 5) yield { val start = System.currentTimeMillis withKeys.foldByKey(0)(_ + _).count() System.currentTimeMillis - start } // around 1400-1800 ms for (i - 1 to 5) yield { val start = System.currentTimeMillis withKeys.aggregateByKey(0)(_ + _, _ + _).count() System.currentTimeMillis - start } // around 5500-6200 ms for (i - 1 to 5) yield { val start = System.currentTimeMillis withKeys.groupByKey().mapValues(_.reduceLeft(_ + _)).count() System.currentTimeMillis - start } Tobias
*ByKey aggregations: performance + order
Hi, I have an RDD[(Long, MyData)] where I want to compute various functions on lists of MyData items with the same key (this will in general be a rather short lists, around 10 items per key). Naturally I was thinking of groupByKey() but was a bit intimidated by the warning: This operation may be very expensive. If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using PairRDDFunctions.aggregateByKey or PairRDDFunctions.reduceByKey will provide much better performance. Now I don't know (yet) if all of the functions I want to compute can be expressed in this way and I was wondering about *how much* more expensive we are talking about. Say I have something like rdd.zipWithIndex.map(kv = (kv._2/10, kv._1)).groupByKey(), i.e. items that will be grouped will 99% live in the same partition (do they?), does this change the performance? Also, if my operations depend on the order in the original RDD (say, string concatenation), how could I make sure the order of the original RDD is respected? Thanks Tobias