[jira] [Comment Edited] (SPARK-2044) Pluggable interface for shuffles

2014-06-09 Thread Weihua Jiang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14021716#comment-14021716
 ] 

Weihua Jiang edited comment on SPARK-2044 at 6/9/14 7:11 AM:
-

Hi Matei,

Thanks a lot for your reply.

1. I am confused about your idea of sorting flag. 
{quote}
The goal is to allow diverse shuffle implementations, so it doesn't make sense 
to add a flag for it. If we add a flag, every ShuffleManager will need to 
implement this feature. Instead we're trying to make the smallest interface 
that the code consuming this data needs, so that we can try multiple 
implementations of ShuffleManager and see which of these features work best.
The Ordering object means that keys are comparable. This flag here would be to 
tell the ShuffleManager to sort the data, so that downstream algorithms like 
joins can work more efficiently.
{quote}
For your first statement, it seems you want to keep interface minimal, thus no 
need-to-sort flag is allowed. But for your second statement, you are allowing 
user to ask ShuffleManager to perform sort for the data. 
>From my point of view, it is better to have such a flag to allow user to ask 
>ShuffleManager to perform sort. Thus, operation like SQL "order by" can be 
>implemented more efficiently. ShuffleManager can provide some utility class to 
>perform general sorting so that not every implementation needs to implement 
>its own sorting logic.

2. I agree that, for ShuffleReader, read a partition range is more efficient. 
However, if we want to break the barrier between map and reduce stage, we will 
encounter a situation that, when a reducer starts, not all its partitions are 
ready. If using partition range, reducer will wait for all partitions to be 
ready before executing reducer. It is better if reducer can start execution 
when some (not all) partitions are ready. The POC code can be found at 
https://github.com/lirui-intel/spark/tree/removeStageBarrier. This is why I 
think we need another read() function to specify a partition list instead of a 
range.  


was (Author: whjiang):
Hi Matei,

Thanks a lot for your reply.

1. I am confused about your idea of sorting flag. 
??The goal is to allow diverse shuffle implementations, so it doesn't make 
sense to add a flag for it. If we add a flag, every ShuffleManager will need to 
implement this feature. Instead we're trying to make the smallest interface 
that the code consuming this data needs, so that we can try multiple 
implementations of ShuffleManager and see which of these features work best.
The Ordering object means that keys are comparable. This flag here would be to 
tell the ShuffleManager to sort the data, so that downstream algorithms like 
joins can work more efficiently.??
For your first statement, it seems you want to keep interface minimal, thus no 
need-to-sort flag is allowed. But for your second statement, you are allowing 
user to ask ShuffleManager to perform sort for the data. 
>From my point of view, it is better to have such a flag to allow user to ask 
>ShuffleManager to perform sort. Thus, operation like SQL "order by" can be 
>implemented more efficiently. ShuffleManager can provide some utility class to 
>perform general sorting so that not every implementation needs to implement 
>its own sorting logic.

2. I agree that, for ShuffleReader, read a partition range is more efficient. 
However, if we want to break the barrier between map and reduce stage, we will 
encounter a situation that, when a reducer starts, not all its partitions are 
ready. If using partition range, reducer will wait for all partitions to be 
ready before executing reducer. It is better if reducer can start execution 
when some (not all) partitions are ready. The POC code can be found at 
https://github.com/lirui-intel/spark/tree/removeStageBarrier. This is why I 
think we need another read() function to specify a partition list instead of a 
range.  

> Pluggable interface for shuffles
> 
>
> Key: SPARK-2044
> URL: https://issues.apache.org/jira/browse/SPARK-2044
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Reporter: Matei Zaharia
>Assignee: Matei Zaharia
> Attachments: Pluggableshuffleproposal.pdf
>
>
> Given that a lot of the current activity in Spark Core is in shuffles, I 
> wanted to propose factoring out shuffle implementations in a way that will 
> make experimentation easier. Ideally we will converge on one implementation, 
> but for a while, this could also be used to have several implementations 
> coexist. I'm suggesting this because I aware of at least three efforts to 
> look at shuffle (from Yahoo!, Intel and Databricks). Some of the things 
> people are investigating are:
> * Push-based shuffle where data moves directl

[jira] [Comment Edited] (SPARK-2044) Pluggable interface for shuffles

2014-06-09 Thread Weihua Jiang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14021716#comment-14021716
 ] 

Weihua Jiang edited comment on SPARK-2044 at 6/9/14 7:10 AM:
-

Hi Matei,

Thanks a lot for your reply.

1. I am confused about your idea of sorting flag. 
??The goal is to allow diverse shuffle implementations, so it doesn't make 
sense to add a flag for it. If we add a flag, every ShuffleManager will need to 
implement this feature. Instead we're trying to make the smallest interface 
that the code consuming this data needs, so that we can try multiple 
implementations of ShuffleManager and see which of these features work best.
The Ordering object means that keys are comparable. This flag here would be to 
tell the ShuffleManager to sort the data, so that downstream algorithms like 
joins can work more efficiently.??
For your first statement, it seems you want to keep interface minimal, thus no 
need-to-sort flag is allowed. But for your second statement, you are allowing 
user to ask ShuffleManager to perform sort for the data. 
>From my point of view, it is better to have such a flag to allow user to ask 
>ShuffleManager to perform sort. Thus, operation like SQL "order by" can be 
>implemented more efficiently. ShuffleManager can provide some utility class to 
>perform general sorting so that not every implementation needs to implement 
>its own sorting logic.

2. I agree that, for ShuffleReader, read a partition range is more efficient. 
However, if we want to break the barrier between map and reduce stage, we will 
encounter a situation that, when a reducer starts, not all its partitions are 
ready. If using partition range, reducer will wait for all partitions to be 
ready before executing reducer. It is better if reducer can start execution 
when some (not all) partitions are ready. The POC code can be found at 
https://github.com/lirui-intel/spark/tree/removeStageBarrier. This is why I 
think we need another read() function to specify a partition list instead of a 
range.  


was (Author: whjiang):
Hi Matei,

Thanks a lot for your reply.

1. I am confused about your idea of sorting flag. 
``The goal is to allow diverse shuffle implementations, so it doesn't make 
sense to add a flag for it. If we add a flag, every ShuffleManager will need to 
implement this feature. Instead we're trying to make the smallest interface 
that the code consuming this data needs, so that we can try multiple 
implementations of ShuffleManager and see which of these features work best.
The Ordering object means that keys are comparable. This flag here would be to 
tell the ShuffleManager to sort the data, so that downstream algorithms like 
joins can work more efficiently.``
For your first statement, it seems you want to keep interface minimal, thus no 
need-to-sort flag is allowed. But for your second statement, you are allowing 
user to ask ShuffleManager to perform sort for the data. 
>From my point of view, it is better to have such a flag to allow user to ask 
>ShuffleManager to perform sort. Thus, operation like SQL "order by" can be 
>implemented more efficiently. ShuffleManager can provide some utility class to 
>perform general sorting so that not every implementation needs to implement 
>its own sorting logic.

2. I agree that, for ShuffleReader, read a partition range is more efficient. 
However, if we want to break the barrier between map and reduce stage, we will 
encounter a situation that, when a reducer starts, not all its partitions are 
ready. If using partition range, reducer will wait for all partitions to be 
ready before executing reducer. It is better if reducer can start execution 
when some (not all) partitions are ready. The POC code can be found at 
https://github.com/lirui-intel/spark/tree/removeStageBarrier. This is why I 
think we need another read() function to specify a partition list instead of a 
range.  

> Pluggable interface for shuffles
> 
>
> Key: SPARK-2044
> URL: https://issues.apache.org/jira/browse/SPARK-2044
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Reporter: Matei Zaharia
>Assignee: Matei Zaharia
> Attachments: Pluggableshuffleproposal.pdf
>
>
> Given that a lot of the current activity in Spark Core is in shuffles, I 
> wanted to propose factoring out shuffle implementations in a way that will 
> make experimentation easier. Ideally we will converge on one implementation, 
> but for a while, this could also be used to have several implementations 
> coexist. I'm suggesting this because I aware of at least three efforts to 
> look at shuffle (from Yahoo!, Intel and Databricks). Some of the things 
> people are investigating are:
> * Push-based shuffle where data moves directly from mappe

[jira] [Comment Edited] (SPARK-2044) Pluggable interface for shuffles

2014-06-05 Thread Raymond Liu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14019531#comment-14019531
 ] 

Raymond Liu edited comment on SPARK-2044 at 6/6/14 3:11 AM:


Hi Matei

regarding the changes to block mnager:

That will allow ShuffleManagers to reuse a common block manager. 
However the interface also allows ShuffleManagers to try new approaches. 

Have you figure out what the interface should looks like? I see the shuffle 
writter/read interface is generalize to be a Product2, while eventually, the 
specific shuffle module will interaction with the disk, and go through 
blockmanager. will you expect it to be Product2 when talk with 
DiskBlockmanager, or keep the current implementation by using Files where a lot 
of shortcut involved in various components say shuffle, spill etc? or anything 
else like a buf , iterator etc?

Since we have also have pluggable storage support in mind spark-1733. the 
actually IO for a store, even diskstore might not always go though FILE 
interface. so I have this question.



was (Author: colorant):
Hi Matei

regarding the changes to block mnager:

That will allow ShuffleManagers to reuse a common block manager. 
However the interface also allows ShuffleManagers to try new approaches. 

Have you figure out what the interface should looks like? I see the shuffle 
writter/read interface is generalize to be a Product2, while eventually, the 
specific shuffle module will interaction with the disk, and go through 
blockmanager. will you expect it to be Product2 when talk with 
DiskBlockmanager, or keep the current implementation by using Files where a lot 
of shortcut involved in various components say shuffle, spill etc? or anything 
else like a buf , iterator etc?

Since we have also have pluggable storage support in mind spark-1733. the 
actually IO for a store, even diskstroe might not always go though FILE 
interface. so I have this question.


> Pluggable interface for shuffles
> 
>
> Key: SPARK-2044
> URL: https://issues.apache.org/jira/browse/SPARK-2044
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Reporter: Matei Zaharia
>Assignee: Matei Zaharia
> Attachments: Pluggableshuffleproposal.pdf
>
>
> Given that a lot of the current activity in Spark Core is in shuffles, I 
> wanted to propose factoring out shuffle implementations in a way that will 
> make experimentation easier. Ideally we will converge on one implementation, 
> but for a while, this could also be used to have several implementations 
> coexist. I'm suggesting this because I aware of at least three efforts to 
> look at shuffle (from Yahoo!, Intel and Databricks). Some of the things 
> people are investigating are:
> * Push-based shuffle where data moves directly from mappers to reducers
> * Sorting-based instead of hash-based shuffle, to create fewer files (helps a 
> lot with file handles and memory usage on large shuffles)
> * External spilling within a key
> * Changing the level of parallelism or even algorithm for downstream stages 
> at runtime based on statistics of the map output (this is a thing we had 
> prototyped in the Shark research project but never merged in core)
> I've attached a design doc with a proposed interface. It's not too crazy 
> because the interface between shuffles and the rest of the code is already 
> pretty narrow (just some iterators for reading data and a writer interface 
> for writing it). Bigger changes will be needed in the interaction with 
> DAGScheduler and BlockManager for some of the ideas above, but we can handle 
> those separately, and this interface will allow us to experiment with some 
> short-term stuff sooner.
> If things go well I'd also like to send a sort-based shuffle implementation 
> for 1.1, but we'll see how the timing on that works out.



--
This message was sent by Atlassian JIRA
(v6.2#6252)