aggregateByKey vs combineByKey

2014-09-29 Thread David Rowe
Hi All,

After some hair pulling, I've reached the realisation that an operation I
am currently doing via:

myRDD.groupByKey.mapValues(func)

should be done more efficiently using aggregateByKey or combineByKey. Both
of these methods would do, and they seem very similar to me in terms of
their function.

My question is, what are the differences between these two methods (other
than the slight differences in their type signatures)? Under what
circumstances should I use one or the other?

Thanks

Dave


Re: aggregateByKey vs combineByKey

2014-09-29 Thread David Rowe
Thanks Liquan, that was really helpful.

On Mon, Sep 29, 2014 at 5:54 PM, Liquan Pei liquan...@gmail.com wrote:

 Hi Dave,

 You can replace groupByKey with reduceByKey to improve performance in some
 cases. reduceByKey performs map side combine which can reduce Network IO
 and shuffle size where as groupByKey will not perform map side combine.

 combineByKey is more general then aggregateByKey. Actually, the
 implementation of aggregateByKey, reduceByKey and groupByKey is achieved by
 combineByKey. aggregateByKey is similar to reduceByKey but you can provide
 initial values when performing aggregation.

 As the name suggests, aggregateByKey is suitable for compute aggregations
 for keys, example aggregations such as sum, avg, etc. The rule here is that
 the extra computation spent for map side combine can reduce the size sent
 out to other nodes and driver. If your func has satisfies this rule, you
 probably should use aggregateByKey.

 combineByKey is more general and you have the flexibility to specify
 whether you'd like to perform map side combine. However, it is more complex
 to use. At minimum, you need to implement three functions: createCombiner,
 mergeValue, mergeCombiners.

 Hope this helps!
 Liquan

 On Sun, Sep 28, 2014 at 11:59 PM, David Rowe davidr...@gmail.com wrote:

 Hi All,

 After some hair pulling, I've reached the realisation that an operation I
 am currently doing via:

 myRDD.groupByKey.mapValues(func)

 should be done more efficiently using aggregateByKey or combineByKey.
 Both of these methods would do, and they seem very similar to me in terms
 of their function.

 My question is, what are the differences between these two methods (other
 than the slight differences in their type signatures)? Under what
 circumstances should I use one or the other?

 Thanks

 Dave





 --
 Liquan Pei
 Department of Physics
 University of Massachusetts Amherst



Re: Where can I find the module diagram of SPARK?

2014-09-23 Thread David Rowe
Hi Andrew,

I can't speak for Theodore, but I would find that incredibly useful.

Dave

On Wed, Sep 24, 2014 at 11:24 AM, Andrew Ash and...@andrewash.com wrote:

 Hi Theodore,

 What do you mean by module diagram?  A high level architecture diagram of
 how the classes are organized into packages?

 Andrew

 On Tue, Sep 23, 2014 at 12:46 AM, Theodore Si sjyz...@gmail.com wrote:

 Hi,

 Please help me with that.

 BR,
 Theodore Si

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Issues with partitionBy: FetchFailed

2014-09-21 Thread David Rowe
Hi,

I've seen this problem before, and I'm not convinced it's GC.

When spark shuffles it writes a lot of small files to store the data to be
sent to other executors (AFAICT). According to what I've read around the
place the intention is that these files be stored in disk buffers, and
since sync() is never called, they exist only in memory. The problem is
when you have a lot of shuffle data, and the executors are configured to
use, say 90% of your memory, one of those is going to be written to disk -
either the JVM will be swapped out, or the files will be written out of
cache.

So, when these nodes are timing out, it's not a GC problem, it's that the
machine is actually thrashing.

I've had some success with this problem by reducing the amount of memory
that the executors are configured to use from say 90% to 60%. I don't know
the internals of the code, but I'm sure this number is related to the
fraction of your data that's going to be shuffled to other nodes. In any
case, it's not something that I can estimate in my own jobs very well - I
usually have to find the right number by trial and error.

Perhaps somebody who knows the internals a bit better can shed some light.

Cheers

Dave

On Sun, Sep 21, 2014 at 9:54 PM, Shao, Saisai saisai.s...@intel.com wrote:

  Hi,



 I’ve also met this problem before, I think you can try to set
 “spark.core.connection.ack.wait.timeout” to a large value to avoid ack
 timeout, default is 60 seconds.



 Sometimes because of GC pause or some other reasons, acknowledged message
 will be timeout, which will lead to this exception, you can try setting a
 large value of this configuration.



 Thanks

 Jerry



 *From:* Julien Carme [mailto:julien.ca...@gmail.com]
 *Sent:* Sunday, September 21, 2014 7:43 PM
 *To:* user@spark.apache.org
 *Subject:* Issues with partitionBy: FetchFailed



 Hello,

 I am facing an issue with partitionBy, it is not clear whether it is a
 problem with my code or with my spark setup. I am using Spark 1.1,
 standalone, and my other spark projects work fine.

 So I have to repartition a relatively large file (about 70 million lines).
 Here is a minimal version of what is not working fine:

 myRDD = sc.textFile(...).map { line = (extractKey(line),line) }

 myRepartitionedRDD = myRDD.partitionBy(new HashPartitioner(100))

 myRepartitionedRDD.saveAsTextFile(...)

 It runs quite some time, until I get some errors and it retries. Errors
 are:

 FetchFailed(BlockManagerId(3,myWorker2, 52082,0),
 shuffleId=1,mapId=1,reduceId=5)

 Logs are not much more infomrative. I get:

 Java.io.IOException : sendMessageReliability failed because ack was not
 received within 60 sec



 I get similar errors with all my workers.

 Do you have some kind of explanation for this behaviour? What could be
 wrong?

 Thanks,







Re: Computing mean and standard deviation by key

2014-09-12 Thread David Rowe
I generally call values.stats, e.g.:

val stats = myPairRdd.values.stats

On Fri, Sep 12, 2014 at 4:46 PM, rzykov rzy...@gmail.com wrote:

 Is it possible to use  DoubleRDDFunctions
 
 https://spark.apache.org/docs/1.0.0/api/java/org/apache/spark/rdd/DoubleRDDFunctions.html
 
 for calculating mean and std dev for Paired RDDs (key, value)?

 Now I'm using an approach with ReduceByKey but want to make my code more
 concise and readable.





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Computing-mean-and-standard-deviation-by-key-tp11192p14062.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Computing mean and standard deviation by key

2014-09-12 Thread David Rowe
Oh I see, I think you're trying to do something like (in SQL):

SELECT order, mean(price) FROM orders GROUP BY order

In this case, I'm not aware of a way to use the DoubleRDDFunctions, since
you have a single RDD of pairs where each pair is of type (KeyType,
Iterable[Double]).

It seems to me that you want to write a function:

def stats(numList: Iterable[Double]): org.apache.spark.util.StatCounter

and then use

pairRdd.mapValues( value = stats(value) )




On Fri, Sep 12, 2014 at 5:05 PM, rzykov rzy...@gmail.com wrote:

 Tried this:

 ordersRDD.join(ordersRDD).map{case((partnerid, itemid),((matchedida,
 pricea), (matchedidb, priceb))) = ((matchedida, matchedidb), (if(priceb 
 0) (pricea/priceb).toDouble else 0.toDouble))}
 .groupByKey
 .values.stats
 .first

 Error:
 console:37: error: could not find implicit value for parameter num:
 Numeric[Iterable[Double]]
   .values.stats





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Computing-mean-and-standard-deviation-by-key-tp11192p14065.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org