RE: Union and reduceByKey will trigger shuffle even same partition?

2015-02-24 Thread Shuai Zheng
Hi Imran,

 

I will say your explanation is extremely helpful J

 

I tested some ideas according to your explanation and it make perfect sense to 
me. I modify my code to use cogroup+mapValues instead of union+reduceByKey to 
preserve the partition, which gives me more than 100% performance gain (for the 
loop part). 

 

Thanks a lot!

 

And I am curious will there any easy way for me to get a detail DAG execution 
plan description without running the code? Just as explain command in pig or 
sql?

 

Shuai

 

From: Imran Rashid [mailto:iras...@cloudera.com] 
Sent: Monday, February 23, 2015 6:00 PM
To: Shuai Zheng
Cc: Shao, Saisai; user@spark.apache.org
Subject: Re: Union and reduceByKey will trigger shuffle even same partition?

 

I think you're getting tripped up lazy evaluation and the way stage boundaries 
work (admittedly its pretty confusing in this case).

 

It is true that up until recently, if you unioned two RDDs with the same 
partitioner, the result did not have the same partitioner.  But that was just 
fixed here:

https://github.com/apache/spark/pull/4629

 

That does mean that after you update ranks, it will no longer have a 
partitioner, which will effect the join on your second iteration here:

 val contributions = links.join(ranks).flatMap

 

But, I think most of the shuffles you are pointing to are a different issue.  I 
may be belaboring something you already know, but I think this is easily 
confusing.  I think

 the first thing is understanding where you get stage boundaries, and how they 
are named.  Each shuffle introduces a stage boundary.  However, the stages get 
named by

the last thing in a stage, which is not really what is always causing the 
shuffle.  Eg., reduceByKey() causes a shuffle, but we don't see that in a stage 
name.  Similarly, map()

does not cause a shuffle, but we see a stage with that name.  

 

So, what do the stage boundaries we see actually correspond to?

 

1) map -- that is doing the shuffle write for the following groupByKey

2) groupByKey -- in addition to reading the shuffle output from your map, this 
is *also* doing the shuffle write for the next shuffle you introduce w/ 
partitionBy

3) union -- this is doing the shuffle reading from your partitionBy, and then 
all the work from there right up until the shuffle write for what is 
immediatley after union -- your

 reduceByKey.

4) lookup is an action, which is why that has another stage.

 

a couple of things to note:
(a) your join does not cause a shuffle, b/c both rdds share a partitioner

(b) you have two shuffles from groupByKey followed by partitionBy -- you really 
probably want the 1 arg form of groupByKey(partitioner)

 

 

hopefully this is helpful to understand how your stages  shuffles correspond 
to your code.

 

Imran

 

 

 

On Mon, Feb 23, 2015 at 3:35 PM, Shuai Zheng szheng.c...@gmail.com wrote:

This also trigger an interesting question:  how can I do this locally by code 
if I want. For example: I have RDD A and B, which has some partition, then if I 
want to join A to B, I might just want to do a mapper side join (although B 
itself might be big, but B’s local partition is known small enough put in 
memory), how can I access other RDD’s local partition in the mapParitition 
method? Is it anyway to do this in Spark?

 

From: Shao, Saisai [mailto:saisai.s...@intel.com] 
Sent: Monday, February 23, 2015 3:13 PM
To: Shuai Zheng
Cc: user@spark.apache.org
Subject: RE: Union and reduceByKey will trigger shuffle even same partition?

 

If you call reduceByKey(), internally Spark will introduce a shuffle 
operations, not matter the data is already partitioned locally, Spark itself do 
not know the data is already well partitioned.

 

So if you want to avoid Shuffle, you have  to write the code explicitly to 
avoid this, from my understanding. You can call mapParitition to get a 
partition of data and reduce by key locally by your logic.

 

Thanks

Saisai

 

From: Shuai Zheng [mailto:szheng.c...@gmail.com] 
Sent: Monday, February 23, 2015 12:00 PM
To: user@spark.apache.org
Subject: Union and reduceByKey will trigger shuffle even same partition?

 

Hi All,

 

I am running a simple page rank program, but it is slow. And I dig out part of 
reason is there is shuffle happen when I call an union action even both RDD 
share the same partition:

 

Below is my test code in spark shell:

 

import org.apache.spark.HashPartitioner

 

sc.getConf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer)

val beta = 0.8

val numOfPartition = 6

  val links = 
sc.textFile(c:/Download/web-Google.txt).filter(!_.contains(#)).map(line={val
 part=line.split(\t); 
(part(0).toInt,part(1).toInt)}).groupByKey.partitionBy(new 
HashPartitioner(numOfPartition)).persist

  var ranks = links.mapValues(_ = 1.0)

  var leakedMatrix = links.mapValues(_ = (1.0-beta)).persist

 

  for (i - 1 until 2) {

val contributions = links.join(ranks).flatMap {

  case (pageId, (links, rank

RE: Union and reduceByKey will trigger shuffle even same partition?

2015-02-23 Thread Shuai Zheng
This also trigger an interesting question:  how can I do this locally by
code if I want. For example: I have RDD A and B, which has some partition,
then if I want to join A to B, I might just want to do a mapper side join
(although B itself might be big, but B's local partition is known small
enough put in memory), how can I access other RDD's local partition in the
mapParitition method? Is it anyway to do this in Spark?

 

From: Shao, Saisai [mailto:saisai.s...@intel.com] 
Sent: Monday, February 23, 2015 3:13 PM
To: Shuai Zheng
Cc: user@spark.apache.org
Subject: RE: Union and reduceByKey will trigger shuffle even same partition?

 

If you call reduceByKey(), internally Spark will introduce a shuffle
operations, not matter the data is already partitioned locally, Spark itself
do not know the data is already well partitioned.

 

So if you want to avoid Shuffle, you have  to write the code explicitly to
avoid this, from my understanding. You can call mapParitition to get a
partition of data and reduce by key locally by your logic.

 

Thanks

Saisai

 

From: Shuai Zheng [mailto:szheng.c...@gmail.com] 
Sent: Monday, February 23, 2015 12:00 PM
To: user@spark.apache.org
Subject: Union and reduceByKey will trigger shuffle even same partition?

 

Hi All,

 

I am running a simple page rank program, but it is slow. And I dig out part
of reason is there is shuffle happen when I call an union action even both
RDD share the same partition:

 

Below is my test code in spark shell:

 

import org.apache.spark.HashPartitioner

 

sc.getConf.set(spark.serializer,
org.apache.spark.serializer.KryoSerializer)

val beta = 0.8

val numOfPartition = 6

  val links =
sc.textFile(c:/Download/web-Google.txt).filter(!_.contains(#)).map(line=
{val part=line.split(\t);
(part(0).toInt,part(1).toInt)}).groupByKey.partitionBy(new
HashPartitioner(numOfPartition)).persist

  var ranks = links.mapValues(_ = 1.0)

  var leakedMatrix = links.mapValues(_ = (1.0-beta)).persist

 

  for (i - 1 until 2) {

val contributions = links.join(ranks).flatMap {

  case (pageId, (links, rank)) =

links.map(dest = (dest, rank / links.size * beta))

}

ranks = contributions.union(leakedMatrix).reduceByKey(_ + _)

  }  

  ranks.lookup(1)

 

In above code, links will join ranks and should preserve the partition, and
leakedMatrix also share the same partition, so I expect there is no shuffle
happen on the contributions.union(leakedMatrix), also on the coming
reduceByKey after that. But finally there is shuffle write for all steps,
map, groupByKey, Union, partitionBy, etc.

 

I expect there should only happen once on the shuffle then all should local
operation, but the screen shows not, do I have any misunderstanding here?

 





RE: Union and reduceByKey will trigger shuffle even same partition?

2015-02-23 Thread Shao, Saisai
If you call reduceByKey(), internally Spark will introduce a shuffle 
operations, not matter the data is already partitioned locally, Spark itself do 
not know the data is already well partitioned.

So if you want to avoid Shuffle, you have  to write the code explicitly to 
avoid this, from my understanding. You can call mapParitition to get a 
partition of data and reduce by key locally by your logic.

Thanks
Saisai

From: Shuai Zheng [mailto:szheng.c...@gmail.com]
Sent: Monday, February 23, 2015 12:00 PM
To: user@spark.apache.org
Subject: Union and reduceByKey will trigger shuffle even same partition?

Hi All,

I am running a simple page rank program, but it is slow. And I dig out part of 
reason is there is shuffle happen when I call an union action even both RDD 
share the same partition:

Below is my test code in spark shell:

import org.apache.spark.HashPartitioner

sc.getConf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer)
val beta = 0.8
val numOfPartition = 6
  val links = 
sc.textFile(c:/Download/web-Google.txt).filter(!_.contains(#)).map(line={val
 part=line.split(\t); 
(part(0).toInt,part(1).toInt)}).groupByKey.partitionBy(new 
HashPartitioner(numOfPartition)).persist
  var ranks = links.mapValues(_ = 1.0)
  var leakedMatrix = links.mapValues(_ = (1.0-beta)).persist

  for (i - 1 until 2) {
val contributions = links.join(ranks).flatMap {
  case (pageId, (links, rank)) =
links.map(dest = (dest, rank / links.size * beta))
}
ranks = contributions.union(leakedMatrix).reduceByKey(_ + _)
  }
  ranks.lookup(1)

In above code, links will join ranks and should preserve the partition, and 
leakedMatrix also share the same partition, so I expect there is no shuffle 
happen on the contributions.union(leakedMatrix), also on the coming reduceByKey 
after that. But finally there is shuffle write for all steps, map, groupByKey, 
Union, partitionBy, etc.

I expect there should only happen once on the shuffle then all should local 
operation, but the screen shows not, do I have any misunderstanding here?

[cid:image001.png@01D04F62.180BCC00]


RE: Union and reduceByKey will trigger shuffle even same partition?

2015-02-23 Thread Shuai Zheng
In the book of learning spark:

 



 

So here it means only no shuffle happen crossing network but still will do
shuffle locally? Even it is the case, why union will trigger shuffle? I
think union will only just append the RDD together.

 

From: Shao, Saisai [mailto:saisai.s...@intel.com] 
Sent: Monday, February 23, 2015 3:13 PM
To: Shuai Zheng
Cc: user@spark.apache.org
Subject: RE: Union and reduceByKey will trigger shuffle even same partition?

 

If you call reduceByKey(), internally Spark will introduce a shuffle
operations, not matter the data is already partitioned locally, Spark itself
do not know the data is already well partitioned.

 

So if you want to avoid Shuffle, you have  to write the code explicitly to
avoid this, from my understanding. You can call mapParitition to get a
partition of data and reduce by key locally by your logic.

 

Thanks

Saisai

 

From: Shuai Zheng [mailto:szheng.c...@gmail.com] 
Sent: Monday, February 23, 2015 12:00 PM
To: user@spark.apache.org
Subject: Union and reduceByKey will trigger shuffle even same partition?

 

Hi All,

 

I am running a simple page rank program, but it is slow. And I dig out part
of reason is there is shuffle happen when I call an union action even both
RDD share the same partition:

 

Below is my test code in spark shell:

 

import org.apache.spark.HashPartitioner

 

sc.getConf.set(spark.serializer,
org.apache.spark.serializer.KryoSerializer)

val beta = 0.8

val numOfPartition = 6

  val links =
sc.textFile(c:/Download/web-Google.txt).filter(!_.contains(#)).map(line=
{val part=line.split(\t);
(part(0).toInt,part(1).toInt)}).groupByKey.partitionBy(new
HashPartitioner(numOfPartition)).persist

  var ranks = links.mapValues(_ = 1.0)

  var leakedMatrix = links.mapValues(_ = (1.0-beta)).persist

 

  for (i - 1 until 2) {

val contributions = links.join(ranks).flatMap {

  case (pageId, (links, rank)) =

links.map(dest = (dest, rank / links.size * beta))

}

ranks = contributions.union(leakedMatrix).reduceByKey(_ + _)

  }  

  ranks.lookup(1)

 

In above code, links will join ranks and should preserve the partition, and
leakedMatrix also share the same partition, so I expect there is no shuffle
happen on the contributions.union(leakedMatrix), also on the coming
reduceByKey after that. But finally there is shuffle write for all steps,
map, groupByKey, Union, partitionBy, etc.

 

I expect there should only happen once on the shuffle then all should local
operation, but the screen shows not, do I have any misunderstanding here?

 





RE: Union and reduceByKey will trigger shuffle even same partition?

2015-02-23 Thread Shao, Saisai
I think some RDD APIs like zipPartitions or others can do this as you wanted. I 
might check the docs.

Thanks
Jerry

From: Shuai Zheng [mailto:szheng.c...@gmail.com]
Sent: Monday, February 23, 2015 1:35 PM
To: Shao, Saisai
Cc: user@spark.apache.org
Subject: RE: Union and reduceByKey will trigger shuffle even same partition?

This also trigger an interesting question:  how can I do this locally by code 
if I want. For example: I have RDD A and B, which has some partition, then if I 
want to join A to B, I might just want to do a mapper side join (although B 
itself might be big, but B's local partition is known small enough put in 
memory), how can I access other RDD's local partition in the mapParitition 
method? Is it anyway to do this in Spark?

From: Shao, Saisai [mailto:saisai.s...@intel.com]
Sent: Monday, February 23, 2015 3:13 PM
To: Shuai Zheng
Cc: user@spark.apache.orgmailto:user@spark.apache.org
Subject: RE: Union and reduceByKey will trigger shuffle even same partition?

If you call reduceByKey(), internally Spark will introduce a shuffle 
operations, not matter the data is already partitioned locally, Spark itself do 
not know the data is already well partitioned.

So if you want to avoid Shuffle, you have  to write the code explicitly to 
avoid this, from my understanding. You can call mapParitition to get a 
partition of data and reduce by key locally by your logic.

Thanks
Saisai

From: Shuai Zheng [mailto:szheng.c...@gmail.com]
Sent: Monday, February 23, 2015 12:00 PM
To: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Union and reduceByKey will trigger shuffle even same partition?

Hi All,

I am running a simple page rank program, but it is slow. And I dig out part of 
reason is there is shuffle happen when I call an union action even both RDD 
share the same partition:

Below is my test code in spark shell:

import org.apache.spark.HashPartitioner

sc.getConf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer)
val beta = 0.8
val numOfPartition = 6
  val links = 
sc.textFile(c:/Download/web-Google.txt).filter(!_.contains(#)).map(line={val
 part=line.split(\t); 
(part(0).toInt,part(1).toInt)}).groupByKey.partitionBy(new 
HashPartitioner(numOfPartition)).persist
  var ranks = links.mapValues(_ = 1.0)
  var leakedMatrix = links.mapValues(_ = (1.0-beta)).persist

  for (i - 1 until 2) {
val contributions = links.join(ranks).flatMap {
  case (pageId, (links, rank)) =
links.map(dest = (dest, rank / links.size * beta))
}
ranks = contributions.union(leakedMatrix).reduceByKey(_ + _)
  }
  ranks.lookup(1)

In above code, links will join ranks and should preserve the partition, and 
leakedMatrix also share the same partition, so I expect there is no shuffle 
happen on the contributions.union(leakedMatrix), also on the coming reduceByKey 
after that. But finally there is shuffle write for all steps, map, groupByKey, 
Union, partitionBy, etc.

I expect there should only happen once on the shuffle then all should local 
operation, but the screen shows not, do I have any misunderstanding here?

[cid:image001.png@01D04F73.AFB2D330]


RE: Union and reduceByKey will trigger shuffle even same partition?

2015-02-23 Thread Shao, Saisai
I've no context of this book, AFAIK union will not trigger shuffle, as they 
just put the partitions together, the operator reduceByKey() will actually 
trigger shuffle.

Thanks
Jerry

From: Shuai Zheng [mailto:szheng.c...@gmail.com]
Sent: Monday, February 23, 2015 12:26 PM
To: Shao, Saisai
Cc: user@spark.apache.org
Subject: RE: Union and reduceByKey will trigger shuffle even same partition?

In the book of learning spark:

[cid:image002.jpg@01D04F74.28C9F870]

So here it means only no shuffle happen crossing network but still will do 
shuffle locally? Even it is the case, why union will trigger shuffle? I think 
union will only just append the RDD together.

From: Shao, Saisai [mailto:saisai.s...@intel.com]
Sent: Monday, February 23, 2015 3:13 PM
To: Shuai Zheng
Cc: user@spark.apache.orgmailto:user@spark.apache.org
Subject: RE: Union and reduceByKey will trigger shuffle even same partition?

If you call reduceByKey(), internally Spark will introduce a shuffle 
operations, not matter the data is already partitioned locally, Spark itself do 
not know the data is already well partitioned.

So if you want to avoid Shuffle, you have  to write the code explicitly to 
avoid this, from my understanding. You can call mapParitition to get a 
partition of data and reduce by key locally by your logic.

Thanks
Saisai

From: Shuai Zheng [mailto:szheng.c...@gmail.com]
Sent: Monday, February 23, 2015 12:00 PM
To: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Union and reduceByKey will trigger shuffle even same partition?

Hi All,

I am running a simple page rank program, but it is slow. And I dig out part of 
reason is there is shuffle happen when I call an union action even both RDD 
share the same partition:

Below is my test code in spark shell:

import org.apache.spark.HashPartitioner

sc.getConf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer)
val beta = 0.8
val numOfPartition = 6
  val links = 
sc.textFile(c:/Download/web-Google.txt).filter(!_.contains(#)).map(line={val
 part=line.split(\t); 
(part(0).toInt,part(1).toInt)}).groupByKey.partitionBy(new 
HashPartitioner(numOfPartition)).persist
  var ranks = links.mapValues(_ = 1.0)
  var leakedMatrix = links.mapValues(_ = (1.0-beta)).persist

  for (i - 1 until 2) {
val contributions = links.join(ranks).flatMap {
  case (pageId, (links, rank)) =
links.map(dest = (dest, rank / links.size * beta))
}
ranks = contributions.union(leakedMatrix).reduceByKey(_ + _)
  }
  ranks.lookup(1)

In above code, links will join ranks and should preserve the partition, and 
leakedMatrix also share the same partition, so I expect there is no shuffle 
happen on the contributions.union(leakedMatrix), also on the coming reduceByKey 
after that. But finally there is shuffle write for all steps, map, groupByKey, 
Union, partitionBy, etc.

I expect there should only happen once on the shuffle then all should local 
operation, but the screen shows not, do I have any misunderstanding here?

[cid:image003.png@01D04F74.28C9F870]


Re: Union and reduceByKey will trigger shuffle even same partition?

2015-02-23 Thread Imran Rashid
I think you're getting tripped up lazy evaluation and the way stage
boundaries work (admittedly its pretty confusing in this case).

It is true that up until recently, if you unioned two RDDs with the same
partitioner, the result did not have the same partitioner.  But that was
just fixed here:
https://github.com/apache/spark/pull/4629

That does mean that after you update ranks, it will no longer have a
partitioner, which will effect the join on your second iteration here:
 val contributions = links.join(ranks).flatMap

But, I think most of the shuffles you are pointing to are a different
issue.  I may be belaboring something you already know, but I think this is
easily confusing.  I think
 the first thing is understanding where you get stage boundaries, and how
they are named.  Each shuffle introduces a stage boundary.  However, the
stages get named by
the last thing in a stage, which is not really what is always causing the
shuffle.  Eg., reduceByKey() causes a shuffle, but we don't see that in a
stage name.  Similarly, map()
does not cause a shuffle, but we see a stage with that name.

So, what do the stage boundaries we see actually correspond to?

1) map -- that is doing the shuffle write for the following groupByKey
2) groupByKey -- in addition to reading the shuffle output from your map,
this is *also* doing the shuffle write for the next shuffle you introduce
w/ partitionBy
3) union -- this is doing the shuffle reading from your partitionBy, and
then all the work from there right up until the shuffle write for what is
immediatley after union -- your
 reduceByKey.
4) lookup is an action, which is why that has another stage.

a couple of things to note:
(a) your join does not cause a shuffle, b/c both rdds share a partitioner
(b) you have two shuffles from groupByKey followed by partitionBy -- you
really probably want the 1 arg form of groupByKey(partitioner)


hopefully this is helpful to understand how your stages  shuffles
correspond to your code.

Imran



On Mon, Feb 23, 2015 at 3:35 PM, Shuai Zheng szheng.c...@gmail.com wrote:

 This also trigger an interesting question:  how can I do this locally by
 code if I want. For example: I have RDD A and B, which has some partition,
 then if I want to join A to B, I might just want to do a mapper side join
 (although B itself might be big, but B’s local partition is known small
 enough put in memory), how can I access other RDD’s local partition in the 
 mapParitition
 method? Is it anyway to do this in Spark?



 *From:* Shao, Saisai [mailto:saisai.s...@intel.com]
 *Sent:* Monday, February 23, 2015 3:13 PM
 *To:* Shuai Zheng
 *Cc:* user@spark.apache.org
 *Subject:* RE: Union and reduceByKey will trigger shuffle even same
 partition?



 If you call reduceByKey(), internally Spark will introduce a shuffle
 operations, not matter the data is already partitioned locally, Spark
 itself do not know the data is already well partitioned.



 So if you want to avoid Shuffle, you have  to write the code explicitly to
 avoid this, from my understanding. You can call mapParitition to get a
 partition of data and reduce by key locally by your logic.



 Thanks

 Saisai



 *From:* Shuai Zheng [mailto:szheng.c...@gmail.com szheng.c...@gmail.com]

 *Sent:* Monday, February 23, 2015 12:00 PM
 *To:* user@spark.apache.org
 *Subject:* Union and reduceByKey will trigger shuffle even same partition?



 Hi All,



 I am running a simple page rank program, but it is slow. And I dig out
 part of reason is there is shuffle happen when I call an union action even
 both RDD share the same partition:



 Below is my test code in spark shell:



 import org.apache.spark.HashPartitioner



 sc.getConf.set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)

 val beta = 0.8

 val numOfPartition = 6

   val links =
 sc.textFile(c:/Download/web-Google.txt).filter(!_.contains(#)).map(line={val
 part=line.split(\t);
 (part(0).toInt,part(1).toInt)}).groupByKey.partitionBy(new
 HashPartitioner(numOfPartition)).persist

   var ranks = links.mapValues(_ = 1.0)

   var leakedMatrix = links.mapValues(_ = (1.0-beta)).persist



   for (i - 1 until 2) {

 val contributions = links.join(ranks).flatMap {

   case (pageId, (links, rank)) =

 links.map(dest = (dest, rank / links.size * beta))

 }

 *ranks = contributions.union(leakedMatrix).reduceByKey(_ + _)*

   }

   ranks.lookup(1)



 In above code, links will join ranks and should preserve the partition,
 and leakedMatrix also share the same partition, so I expect there is no
 shuffle happen on the contributions.union(leakedMatrix), also on the coming
 reduceByKey after that. But finally there is shuffle write for all steps,
 map, groupByKey, Union, partitionBy, etc.



 I expect there should only happen once on the shuffle then all should
 local operation, but the screen shows not, do I have any misunderstanding
 here?