Re: Confused by groupByKey() and the default partitioner

2014-07-13 Thread Guanhua Yan
Thanks, Aaron. I replaced groupByKey with reduceByKey along with some list
concatenation operations, and found that the performance becomes even worse.
So groupByKey is not that bad in my code.

Best regards,
- Guanhua



From:  Aaron Davidson ilike...@gmail.com
Reply-To:  user@spark.apache.org
Date:  Sat, 12 Jul 2014 16:32:22 -0700
To:  user@spark.apache.org
Subject:  Re: Confused by groupByKey() and the default partitioner

Yes, groupByKey() does partition by the hash of the key unless you specify a
custom Partitioner.

(1) If you were to use groupByKey() when the data was already partitioned
correctly, the data would indeed not be shuffled. Here is the associated
code, you'll see that it simply checks that the Partitioner the groupBy() is
looking for is equal to the Partitioner of the pre-existing RDD:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/s
park/rdd/PairRDDFunctions.scala#L89

By the way, I should warn you that groupByKey() is not a recommended
operation if you can avoid it, as it has non-obvious performance issues when
running with serious data.


On Sat, Jul 12, 2014 at 12:20 PM, Guanhua Yan gh...@lanl.gov wrote:
 Hi:
 
 I have trouble understanding the default partitioner (hash) in Spark. Suppose
 that an RDD with two partitions is created as follows:
 x = sc.parallelize([(a, 1), (b, 4), (a, 10), (c, 7)], 2)
 Does spark partition x based on the hash of the key (e.g., a, b, c) by
 default?
 (1) Assuming this is correct, if I further use the groupByKey primitive,
 x.groupByKey(), all the records sharing the same key should be located in the
 same partition. Then it's not necessary to shuffle the data records around, as
 all the grouping operations can be done locally.
 (2) If it's not true, how could I specify a partitioner simply based on the
 hashing of the key (in Python)?
 Thank you,
 - Guanhua





Re: Confused by groupByKey() and the default partitioner

2014-07-13 Thread Aaron Davidson
Ah -- I should have been more clear, list concatenation isn't going to be
any faster. In many cases I've seen people use groupByKey() when they are
really trying to do some sort of aggregation. and thus constructing this
concatenated list is more expensive than they need.


On Sun, Jul 13, 2014 at 9:13 AM, Guanhua Yan gh...@lanl.gov wrote:

 Thanks, Aaron. I replaced groupByKey with reduceByKey along with some list
 concatenation operations, and found that the performance becomes even
 worse. So groupByKey is not that bad in my code.

 Best regards,
 - Guanhua



 From: Aaron Davidson ilike...@gmail.com
 Reply-To: user@spark.apache.org
 Date: Sat, 12 Jul 2014 16:32:22 -0700
 To: user@spark.apache.org
 Subject: Re: Confused by groupByKey() and the default partitioner

 Yes, groupByKey() does partition by the hash of the key unless you specify
 a custom Partitioner.

 (1) If you were to use groupByKey() when the data was already partitioned
 correctly, the data would indeed not be shuffled. Here is the associated
 code, you'll see that it simply checks that the Partitioner the groupBy()
 is looking for is equal to the Partitioner of the pre-existing RDD:
 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L89

 By the way, I should warn you that groupByKey() is not a recommended
 operation if you can avoid it, as it has non-obvious performance issues
 when running with serious data.


 On Sat, Jul 12, 2014 at 12:20 PM, Guanhua Yan gh...@lanl.gov wrote:

 Hi:

 I have trouble understanding the default partitioner (hash) in Spark.
 Suppose that an RDD with two partitions is created as follows:

 x = sc.parallelize([(a, 1), (b, 4), (a, 10), (c, 7)], 2)

 Does spark partition x based on the hash of the key (e.g., a, b, c) by 
 default?

 (1) Assuming this is correct, if I further use the groupByKey primitive, 
 x.groupByKey(), all the records sharing the same key should be located in 
 the same partition. Then it's not necessary to shuffle the data records 
 around, as all the grouping operations can be done locally.

 (2) If it's not true, how could I specify a partitioner simply based on the 
 hashing of the key (in Python)?

 Thank you,

 - Guanhua





Confused by groupByKey() and the default partitioner

2014-07-12 Thread Guanhua Yan
Hi:

I have trouble understanding the default partitioner (hash) in Spark.
Suppose that an RDD with two partitions is created as follows:
x = sc.parallelize([(a, 1), (b, 4), (a, 10), (c, 7)], 2)
Does spark partition x based on the hash of the key (e.g., a, b, c) by
default?
(1) Assuming this is correct, if I further use the groupByKey primitive,
x.groupByKey(), all the records sharing the same key should be located in
the same partition. Then it's not necessary to shuffle the data records
around, as all the grouping operations can be done locally.
(2) If it's not true, how could I specify a partitioner simply based on the
hashing of the key (in Python)?
Thank you,
- Guanhua




Re: Confused by groupByKey() and the default partitioner

2014-07-12 Thread Aaron Davidson
Yes, groupByKey() does partition by the hash of the key unless you specify
a custom Partitioner.

(1) If you were to use groupByKey() when the data was already partitioned
correctly, the data would indeed not be shuffled. Here is the associated
code, you'll see that it simply checks that the Partitioner the groupBy()
is looking for is equal to the Partitioner of the pre-existing RDD:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L89

By the way, I should warn you that groupByKey() is not a recommended
operation if you can avoid it, as it has non-obvious performance issues
when running with serious data.


On Sat, Jul 12, 2014 at 12:20 PM, Guanhua Yan gh...@lanl.gov wrote:

 Hi:

 I have trouble understanding the default partitioner (hash) in Spark.
 Suppose that an RDD with two partitions is created as follows:

 x = sc.parallelize([(a, 1), (b, 4), (a, 10), (c, 7)], 2)

 Does spark partition x based on the hash of the key (e.g., a, b, c) by 
 default?

 (1) Assuming this is correct, if I further use the groupByKey primitive, 
 x.groupByKey(), all the records sharing the same key should be located in the 
 same partition. Then it's not necessary to shuffle the data records around, 
 as all the grouping operations can be done locally.

 (2) If it's not true, how could I specify a partitioner simply based on the 
 hashing of the key (in Python)?

 Thank you,

 - Guanhua