aggregateByKey vs combineByKey
Hi All, After some hair pulling, I've reached the realisation that an operation I am currently doing via: myRDD.groupByKey.mapValues(func) should be done more efficiently using aggregateByKey or combineByKey. Both of these methods would do, and they seem very similar to me in terms of their function. My question is, what are the differences between these two methods (other than the slight differences in their type signatures)? Under what circumstances should I use one or the other? Thanks Dave
Re: aggregateByKey vs combineByKey
Thanks Liquan, that was really helpful. On Mon, Sep 29, 2014 at 5:54 PM, Liquan Pei liquan...@gmail.com wrote: Hi Dave, You can replace groupByKey with reduceByKey to improve performance in some cases. reduceByKey performs map side combine which can reduce Network IO and shuffle size where as groupByKey will not perform map side combine. combineByKey is more general then aggregateByKey. Actually, the implementation of aggregateByKey, reduceByKey and groupByKey is achieved by combineByKey. aggregateByKey is similar to reduceByKey but you can provide initial values when performing aggregation. As the name suggests, aggregateByKey is suitable for compute aggregations for keys, example aggregations such as sum, avg, etc. The rule here is that the extra computation spent for map side combine can reduce the size sent out to other nodes and driver. If your func has satisfies this rule, you probably should use aggregateByKey. combineByKey is more general and you have the flexibility to specify whether you'd like to perform map side combine. However, it is more complex to use. At minimum, you need to implement three functions: createCombiner, mergeValue, mergeCombiners. Hope this helps! Liquan On Sun, Sep 28, 2014 at 11:59 PM, David Rowe davidr...@gmail.com wrote: Hi All, After some hair pulling, I've reached the realisation that an operation I am currently doing via: myRDD.groupByKey.mapValues(func) should be done more efficiently using aggregateByKey or combineByKey. Both of these methods would do, and they seem very similar to me in terms of their function. My question is, what are the differences between these two methods (other than the slight differences in their type signatures)? Under what circumstances should I use one or the other? Thanks Dave -- Liquan Pei Department of Physics University of Massachusetts Amherst
Re: Where can I find the module diagram of SPARK?
Hi Andrew, I can't speak for Theodore, but I would find that incredibly useful. Dave On Wed, Sep 24, 2014 at 11:24 AM, Andrew Ash and...@andrewash.com wrote: Hi Theodore, What do you mean by module diagram? A high level architecture diagram of how the classes are organized into packages? Andrew On Tue, Sep 23, 2014 at 12:46 AM, Theodore Si sjyz...@gmail.com wrote: Hi, Please help me with that. BR, Theodore Si - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Issues with partitionBy: FetchFailed
Hi, I've seen this problem before, and I'm not convinced it's GC. When spark shuffles it writes a lot of small files to store the data to be sent to other executors (AFAICT). According to what I've read around the place the intention is that these files be stored in disk buffers, and since sync() is never called, they exist only in memory. The problem is when you have a lot of shuffle data, and the executors are configured to use, say 90% of your memory, one of those is going to be written to disk - either the JVM will be swapped out, or the files will be written out of cache. So, when these nodes are timing out, it's not a GC problem, it's that the machine is actually thrashing. I've had some success with this problem by reducing the amount of memory that the executors are configured to use from say 90% to 60%. I don't know the internals of the code, but I'm sure this number is related to the fraction of your data that's going to be shuffled to other nodes. In any case, it's not something that I can estimate in my own jobs very well - I usually have to find the right number by trial and error. Perhaps somebody who knows the internals a bit better can shed some light. Cheers Dave On Sun, Sep 21, 2014 at 9:54 PM, Shao, Saisai saisai.s...@intel.com wrote: Hi, I’ve also met this problem before, I think you can try to set “spark.core.connection.ack.wait.timeout” to a large value to avoid ack timeout, default is 60 seconds. Sometimes because of GC pause or some other reasons, acknowledged message will be timeout, which will lead to this exception, you can try setting a large value of this configuration. Thanks Jerry *From:* Julien Carme [mailto:julien.ca...@gmail.com] *Sent:* Sunday, September 21, 2014 7:43 PM *To:* user@spark.apache.org *Subject:* Issues with partitionBy: FetchFailed Hello, I am facing an issue with partitionBy, it is not clear whether it is a problem with my code or with my spark setup. I am using Spark 1.1, standalone, and my other spark projects work fine. So I have to repartition a relatively large file (about 70 million lines). Here is a minimal version of what is not working fine: myRDD = sc.textFile(...).map { line = (extractKey(line),line) } myRepartitionedRDD = myRDD.partitionBy(new HashPartitioner(100)) myRepartitionedRDD.saveAsTextFile(...) It runs quite some time, until I get some errors and it retries. Errors are: FetchFailed(BlockManagerId(3,myWorker2, 52082,0), shuffleId=1,mapId=1,reduceId=5) Logs are not much more infomrative. I get: Java.io.IOException : sendMessageReliability failed because ack was not received within 60 sec I get similar errors with all my workers. Do you have some kind of explanation for this behaviour? What could be wrong? Thanks,
Re: Computing mean and standard deviation by key
I generally call values.stats, e.g.: val stats = myPairRdd.values.stats On Fri, Sep 12, 2014 at 4:46 PM, rzykov rzy...@gmail.com wrote: Is it possible to use DoubleRDDFunctions https://spark.apache.org/docs/1.0.0/api/java/org/apache/spark/rdd/DoubleRDDFunctions.html for calculating mean and std dev for Paired RDDs (key, value)? Now I'm using an approach with ReduceByKey but want to make my code more concise and readable. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Computing-mean-and-standard-deviation-by-key-tp11192p14062.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Computing mean and standard deviation by key
Oh I see, I think you're trying to do something like (in SQL): SELECT order, mean(price) FROM orders GROUP BY order In this case, I'm not aware of a way to use the DoubleRDDFunctions, since you have a single RDD of pairs where each pair is of type (KeyType, Iterable[Double]). It seems to me that you want to write a function: def stats(numList: Iterable[Double]): org.apache.spark.util.StatCounter and then use pairRdd.mapValues( value = stats(value) ) On Fri, Sep 12, 2014 at 5:05 PM, rzykov rzy...@gmail.com wrote: Tried this: ordersRDD.join(ordersRDD).map{case((partnerid, itemid),((matchedida, pricea), (matchedidb, priceb))) = ((matchedida, matchedidb), (if(priceb 0) (pricea/priceb).toDouble else 0.toDouble))} .groupByKey .values.stats .first Error: console:37: error: could not find implicit value for parameter num: Numeric[Iterable[Double]] .values.stats -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Computing-mean-and-standard-deviation-by-key-tp11192p14065.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org