[jira] [Commented] (SPARK-2044) Pluggable interface for shuffles

2014-08-15 Thread Patrick Wendell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14099136#comment-14099136
 ] 

Patrick Wendell commented on SPARK-2044:


A lot of this has been fixed in 1.1 so I moved target version to 1.2. [~matei] 
we can also close this with fixVersion=1.1.0 if you consider the initial issue 
fixed.

> Pluggable interface for shuffles
> 
>
> Key: SPARK-2044
> URL: https://issues.apache.org/jira/browse/SPARK-2044
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Reporter: Matei Zaharia
>Assignee: Matei Zaharia
> Attachments: Pluggableshuffleproposal.pdf
>
>
> Given that a lot of the current activity in Spark Core is in shuffles, I 
> wanted to propose factoring out shuffle implementations in a way that will 
> make experimentation easier. Ideally we will converge on one implementation, 
> but for a while, this could also be used to have several implementations 
> coexist. I'm suggesting this because I aware of at least three efforts to 
> look at shuffle (from Yahoo!, Intel and Databricks). Some of the things 
> people are investigating are:
> * Push-based shuffle where data moves directly from mappers to reducers
> * Sorting-based instead of hash-based shuffle, to create fewer files (helps a 
> lot with file handles and memory usage on large shuffles)
> * External spilling within a key
> * Changing the level of parallelism or even algorithm for downstream stages 
> at runtime based on statistics of the map output (this is a thing we had 
> prototyped in the Shark research project but never merged in core)
> I've attached a design doc with a proposed interface. It's not too crazy 
> because the interface between shuffles and the rest of the code is already 
> pretty narrow (just some iterators for reading data and a writer interface 
> for writing it). Bigger changes will be needed in the interaction with 
> DAGScheduler and BlockManager for some of the ideas above, but we can handle 
> those separately, and this interface will allow us to experiment with some 
> short-term stuff sooner.
> If things go well I'd also like to send a sort-based shuffle implementation 
> for 1.1, but we'll see how the timing on that works out.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2044) Pluggable interface for shuffles

2014-06-05 Thread Saisai Shao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14019511#comment-14019511
 ] 

Saisai Shao commented on SPARK-2044:


Hi Matei, it's great to see you guys have plan on shuffle things. We also 
implemented pluggable shuffle manager and are planing to submit a PR, I think 
the basic idea is quite the same, would you mind taking a look at our 
implementation 
(https://github.com/jerryshao/apache-spark/tree/shuffle-write-improvement/core/src/main/scala/org/apache/spark/storage/shuffle).
 Also I'm wondering if I can contribute my efforts to this proposal or have 
chances to cooperate. Thanks a lot.

> Pluggable interface for shuffles
> 
>
> Key: SPARK-2044
> URL: https://issues.apache.org/jira/browse/SPARK-2044
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Reporter: Matei Zaharia
>Assignee: Matei Zaharia
> Attachments: Pluggableshuffleproposal.pdf
>
>
> Given that a lot of the current activity in Spark Core is in shuffles, I 
> wanted to propose factoring out shuffle implementations in a way that will 
> make experimentation easier. Ideally we will converge on one implementation, 
> but for a while, this could also be used to have several implementations 
> coexist. I'm suggesting this because I aware of at least three efforts to 
> look at shuffle (from Yahoo!, Intel and Databricks). Some of the things 
> people are investigating are:
> * Push-based shuffle where data moves directly from mappers to reducers
> * Sorting-based instead of hash-based shuffle, to create fewer files (helps a 
> lot with file handles and memory usage on large shuffles)
> * External spilling within a key
> * Changing the level of parallelism or even algorithm for downstream stages 
> at runtime based on statistics of the map output (this is a thing we had 
> prototyped in the Shark research project but never merged in core)
> I've attached a design doc with a proposed interface. It's not too crazy 
> because the interface between shuffles and the rest of the code is already 
> pretty narrow (just some iterators for reading data and a writer interface 
> for writing it). Bigger changes will be needed in the interaction with 
> DAGScheduler and BlockManager for some of the ideas above, but we can handle 
> those separately, and this interface will allow us to experiment with some 
> short-term stuff sooner.
> If things go well I'd also like to send a sort-based shuffle implementation 
> for 1.1, but we'll see how the timing on that works out.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2044) Pluggable interface for shuffles

2014-06-05 Thread Raymond Liu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14019531#comment-14019531
 ] 

Raymond Liu commented on SPARK-2044:


Hi Matei

regarding the changes to block mnager:

That will allow ShuffleManagers to reuse a common block manager. 
However the interface also allows ShuffleManagers to try new approaches. 

Have you figure out what the interface should looks like? I see the shuffle 
writter/read interface is generalize to be a Product2, while eventually, the 
specific shuffle module will interaction with the disk, and go through 
blockmanager. will you expect it to be Product2 when talk with 
DiskBlockmanager, or keep the current implementation by using Files where a lot 
of shortcut involved in various components say shuffle, spill etc? or anything 
else like a buf , iterator etc?

Since we have also have pluggable storage support in mind spark-1733. the 
actually IO for a store, even diskstroe might not always go though FILE 
interface. so I have this question.


> Pluggable interface for shuffles
> 
>
> Key: SPARK-2044
> URL: https://issues.apache.org/jira/browse/SPARK-2044
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Reporter: Matei Zaharia
>Assignee: Matei Zaharia
> Attachments: Pluggableshuffleproposal.pdf
>
>
> Given that a lot of the current activity in Spark Core is in shuffles, I 
> wanted to propose factoring out shuffle implementations in a way that will 
> make experimentation easier. Ideally we will converge on one implementation, 
> but for a while, this could also be used to have several implementations 
> coexist. I'm suggesting this because I aware of at least three efforts to 
> look at shuffle (from Yahoo!, Intel and Databricks). Some of the things 
> people are investigating are:
> * Push-based shuffle where data moves directly from mappers to reducers
> * Sorting-based instead of hash-based shuffle, to create fewer files (helps a 
> lot with file handles and memory usage on large shuffles)
> * External spilling within a key
> * Changing the level of parallelism or even algorithm for downstream stages 
> at runtime based on statistics of the map output (this is a thing we had 
> prototyped in the Shark research project but never merged in core)
> I've attached a design doc with a proposed interface. It's not too crazy 
> because the interface between shuffles and the rest of the code is already 
> pretty narrow (just some iterators for reading data and a writer interface 
> for writing it). Bigger changes will be needed in the interaction with 
> DAGScheduler and BlockManager for some of the ideas above, but we can handle 
> those separately, and this interface will allow us to experiment with some 
> short-term stuff sooner.
> If things go well I'd also like to send a sort-based shuffle implementation 
> for 1.1, but we'll see how the timing on that works out.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2044) Pluggable interface for shuffles

2014-06-05 Thread Matei Zaharia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14019590#comment-14019590
 ] 

Matei Zaharia commented on SPARK-2044:
--

Saisai, regarding the pluggable implementation -- if you would like to do it 
based on this doc, be my guest. I see there are a few API differences in your 
version (e.g. I want to be able to request a range of reduce keys, and pass an 
Ordering and an Aggregator to the shuffle). The other issue I ran into is that 
I want to hide the MapOutputTracker behind the ShuffleManager, which I think 
you aren't doing right now. This requires changing DAGScheduler a bit in how it 
interacts with the tracker. The reason is that we found keeping track about a 
lot of info for each map (in particular the size of its output for each reduce) 
is expensive, and it might be nice to abstract this and try different versions 
of it (e.g. one where reduce tasks query the size from the node they want to 
fetch from).

I've pushed my work in progress (still incomplete) to 
https://github.com/mateiz/spark/tree/pluggable-shuffle/core/src/main/scala/org/apache/spark/shuffle.

Raymond, regarding the BlockManager, we haven't thought much about the 
interface there. We want to implement sort-based shuffle using the current one 
if possible but it would be good to hear ideas. Basically there are two things 
you want -- to write in a block / file (one issue Yahoo brought up is that 
they'd like these to be bigger than 2 GB) and to fetch a *range* of a block 
remotely (which we sort of hard-code for our current consolidation approach).

> Pluggable interface for shuffles
> 
>
> Key: SPARK-2044
> URL: https://issues.apache.org/jira/browse/SPARK-2044
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Reporter: Matei Zaharia
>Assignee: Matei Zaharia
> Attachments: Pluggableshuffleproposal.pdf
>
>
> Given that a lot of the current activity in Spark Core is in shuffles, I 
> wanted to propose factoring out shuffle implementations in a way that will 
> make experimentation easier. Ideally we will converge on one implementation, 
> but for a while, this could also be used to have several implementations 
> coexist. I'm suggesting this because I aware of at least three efforts to 
> look at shuffle (from Yahoo!, Intel and Databricks). Some of the things 
> people are investigating are:
> * Push-based shuffle where data moves directly from mappers to reducers
> * Sorting-based instead of hash-based shuffle, to create fewer files (helps a 
> lot with file handles and memory usage on large shuffles)
> * External spilling within a key
> * Changing the level of parallelism or even algorithm for downstream stages 
> at runtime based on statistics of the map output (this is a thing we had 
> prototyped in the Shark research project but never merged in core)
> I've attached a design doc with a proposed interface. It's not too crazy 
> because the interface between shuffles and the rest of the code is already 
> pretty narrow (just some iterators for reading data and a writer interface 
> for writing it). Bigger changes will be needed in the interaction with 
> DAGScheduler and BlockManager for some of the ideas above, but we can handle 
> those separately, and this interface will allow us to experiment with some 
> short-term stuff sooner.
> If things go well I'd also like to send a sort-based shuffle implementation 
> for 1.1, but we'll see how the timing on that works out.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2044) Pluggable interface for shuffles

2014-06-05 Thread Weihua Jiang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14019604#comment-14019604
 ] 

Weihua Jiang commented on SPARK-2044:
-

Hi Matei,

Some quick comments:
1. Is it a goal to support more kind of shuffle: e.g. moving sort from reducer 
to mapper? If yes, it seems it is better to add additional flag to 
shuffleManager. I find similar statements in page 3 
??When the shuffle has no Aggregator (i.e. null or None is passed in), keys and 
values are simply sent across the network. Optionally we might allow the 
ShuffleManager to specify whether keys read from a ShuffleReader are sorted, or 
add a flag to registerShuffle that requests this for keys that have an 
Ordering. This would simplify grouping operators downstream (e.g. cogroup).??
Does this mean that ordering is an inherit property of input data or it wants 
ShuffleManager to perform sorting for the data?
2. Is it a goal to support prefetch of map data at reducer side?
3. for ShuffleReader, why only partition range is allowed? How about extend 
this API to support multiple indididual partitions? For example, if reducer 
knows that partitions 1,3,5 are ready while 2,4,6 are not, reducer can fetch 
1,3,5 at first. Instead of making 3 calls of getReader, making one call can 
reduce mapper side disk seek operations, e.g. if partitions 3,5 are on 
continous on one node.
4. I am not sure whether such a partition list or range shall return one reader 
instance or mulitple ones.


> Pluggable interface for shuffles
> 
>
> Key: SPARK-2044
> URL: https://issues.apache.org/jira/browse/SPARK-2044
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Reporter: Matei Zaharia
>Assignee: Matei Zaharia
> Attachments: Pluggableshuffleproposal.pdf
>
>
> Given that a lot of the current activity in Spark Core is in shuffles, I 
> wanted to propose factoring out shuffle implementations in a way that will 
> make experimentation easier. Ideally we will converge on one implementation, 
> but for a while, this could also be used to have several implementations 
> coexist. I'm suggesting this because I aware of at least three efforts to 
> look at shuffle (from Yahoo!, Intel and Databricks). Some of the things 
> people are investigating are:
> * Push-based shuffle where data moves directly from mappers to reducers
> * Sorting-based instead of hash-based shuffle, to create fewer files (helps a 
> lot with file handles and memory usage on large shuffles)
> * External spilling within a key
> * Changing the level of parallelism or even algorithm for downstream stages 
> at runtime based on statistics of the map output (this is a thing we had 
> prototyped in the Shark research project but never merged in core)
> I've attached a design doc with a proposed interface. It's not too crazy 
> because the interface between shuffles and the rest of the code is already 
> pretty narrow (just some iterators for reading data and a writer interface 
> for writing it). Bigger changes will be needed in the interaction with 
> DAGScheduler and BlockManager for some of the ideas above, but we can handle 
> those separately, and this interface will allow us to experiment with some 
> short-term stuff sooner.
> If things go well I'd also like to send a sort-based shuffle implementation 
> for 1.1, but we'll see how the timing on that works out.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2044) Pluggable interface for shuffles

2014-06-05 Thread Saisai Shao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14019617#comment-14019617
 ] 

Saisai Shao commented on SPARK-2044:


Hi Matei, thanks for your reply. I will carefully read your doc and follow your 
work. So where should I start ?

> Pluggable interface for shuffles
> 
>
> Key: SPARK-2044
> URL: https://issues.apache.org/jira/browse/SPARK-2044
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Reporter: Matei Zaharia
>Assignee: Matei Zaharia
> Attachments: Pluggableshuffleproposal.pdf
>
>
> Given that a lot of the current activity in Spark Core is in shuffles, I 
> wanted to propose factoring out shuffle implementations in a way that will 
> make experimentation easier. Ideally we will converge on one implementation, 
> but for a while, this could also be used to have several implementations 
> coexist. I'm suggesting this because I aware of at least three efforts to 
> look at shuffle (from Yahoo!, Intel and Databricks). Some of the things 
> people are investigating are:
> * Push-based shuffle where data moves directly from mappers to reducers
> * Sorting-based instead of hash-based shuffle, to create fewer files (helps a 
> lot with file handles and memory usage on large shuffles)
> * External spilling within a key
> * Changing the level of parallelism or even algorithm for downstream stages 
> at runtime based on statistics of the map output (this is a thing we had 
> prototyped in the Shark research project but never merged in core)
> I've attached a design doc with a proposed interface. It's not too crazy 
> because the interface between shuffles and the rest of the code is already 
> pretty narrow (just some iterators for reading data and a writer interface 
> for writing it). Bigger changes will be needed in the interaction with 
> DAGScheduler and BlockManager for some of the ideas above, but we can handle 
> those separately, and this interface will allow us to experiment with some 
> short-term stuff sooner.
> If things go well I'd also like to send a sort-based shuffle implementation 
> for 1.1, but we'll see how the timing on that works out.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2044) Pluggable interface for shuffles

2014-06-06 Thread Matei Zaharia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14020087#comment-14020087
 ] 

Matei Zaharia commented on SPARK-2044:
--

{quote}
1. Is it a goal to support more kind of shuffle: e.g. moving sort from reducer 
to mapper? If yes, it seems it is better to add additional flag to 
shuffleManager. I find similar statements in page 3 
{quote}
The goal is to allow diverse shuffle implementations, so it doesn't make sense 
to add a flag for it. If we add a flag, every ShuffleManager will need to 
implement this feature. Instead we're trying to make the smallest interface 
that the code consuming this data needs, so that we can try multiple 
implementations of ShuffleManager and see which of these features work best.

{quote}
??When the shuffle has no Aggregator (i.e. null or None is passed in), keys and 
values are simply sent across the network. Optionally we might allow the 
ShuffleManager to specify whether keys read from a ShuffleReader are sorted, or 
add a flag to registerShuffle that requests this for keys that have an 
Ordering. This would simplify grouping operators downstream (e.g. cogroup).??
Does this mean that ordering is an inherit property of input data or it wants 
ShuffleManager to perform sorting for the data?
{quote}
The Ordering object means that keys are comparable. This flag here would be to 
tell the ShuffleManager to sort the data, so that downstream algorithms like 
joins can work more efficiently.

{quote}
2. Is it a goal to support prefetch of map data at reducer side?
{quote}
Again this might be done by some implementations of ShuffleManager

{quote}
3. for ShuffleReader, why only partition range is allowed? How about extend 
this API to support multiple indididual partitions? For example, if reducer 
knows that partitions 1,3,5 are ready while 2,4,6 are not, reducer can fetch 
1,3,5 at first. Instead of making 3 calls of getReader, making one call can 
reduce mapper side disk seek operations, e.g. if partitions 3,5 are on 
continous on one node.
{quote}
The reducer code shouldn't have to worry about what order to fetch things in. 
Instead, when you request a range, the ShuffleManager implementation can decide 
which partitions to fetch first based on what's available. The idea was that 
some code in DAGScheduler decides on the number of reduce tasks and their 
partition ranges (by looking at the map output size for each partition) and 
then the ShuffleManager on each node fetches the right partitions. Ranges are 
simpler to deal with than arbitrary sets and more space-efficient to represent 
(e.g. imagine we had 100,000 map tasks).

{quote}
4. I am not sure whether such a partition list or range shall return one reader 
instance or mulitple ones.
{quote}
It returns one reader that gathers and combines key-value pairs across all the 
partitions.


> Pluggable interface for shuffles
> 
>
> Key: SPARK-2044
> URL: https://issues.apache.org/jira/browse/SPARK-2044
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Reporter: Matei Zaharia
>Assignee: Matei Zaharia
> Attachments: Pluggableshuffleproposal.pdf
>
>
> Given that a lot of the current activity in Spark Core is in shuffles, I 
> wanted to propose factoring out shuffle implementations in a way that will 
> make experimentation easier. Ideally we will converge on one implementation, 
> but for a while, this could also be used to have several implementations 
> coexist. I'm suggesting this because I aware of at least three efforts to 
> look at shuffle (from Yahoo!, Intel and Databricks). Some of the things 
> people are investigating are:
> * Push-based shuffle where data moves directly from mappers to reducers
> * Sorting-based instead of hash-based shuffle, to create fewer files (helps a 
> lot with file handles and memory usage on large shuffles)
> * External spilling within a key
> * Changing the level of parallelism or even algorithm for downstream stages 
> at runtime based on statistics of the map output (this is a thing we had 
> prototyped in the Shark research project but never merged in core)
> I've attached a design doc with a proposed interface. It's not too crazy 
> because the interface between shuffles and the rest of the code is already 
> pretty narrow (just some iterators for reading data and a writer interface 
> for writing it). Bigger changes will be needed in the interaction with 
> DAGScheduler and BlockManager for some of the ideas above, but we can handle 
> those separately, and this interface will allow us to experiment with some 
> short-term stuff sooner.
> If things go well I'd also like to send a sort-based shuffle implementation 
> for 1.1, but we'll see how the timing on that works out.



--
This message was sent by Atlass

[jira] [Commented] (SPARK-2044) Pluggable interface for shuffles

2014-06-06 Thread Matei Zaharia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14020090#comment-14020090
 ] 

Matei Zaharia commented on SPARK-2044:
--

{quote}
Hi Matei, thanks for your reply. I will carefully read your doc and follow your 
work. So where should I start ?
{quote}
I'm going to spend some time today completing some of the refactoring I started 
to do and then post it to my branch so you can work from there.

> Pluggable interface for shuffles
> 
>
> Key: SPARK-2044
> URL: https://issues.apache.org/jira/browse/SPARK-2044
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Reporter: Matei Zaharia
>Assignee: Matei Zaharia
> Attachments: Pluggableshuffleproposal.pdf
>
>
> Given that a lot of the current activity in Spark Core is in shuffles, I 
> wanted to propose factoring out shuffle implementations in a way that will 
> make experimentation easier. Ideally we will converge on one implementation, 
> but for a while, this could also be used to have several implementations 
> coexist. I'm suggesting this because I aware of at least three efforts to 
> look at shuffle (from Yahoo!, Intel and Databricks). Some of the things 
> people are investigating are:
> * Push-based shuffle where data moves directly from mappers to reducers
> * Sorting-based instead of hash-based shuffle, to create fewer files (helps a 
> lot with file handles and memory usage on large shuffles)
> * External spilling within a key
> * Changing the level of parallelism or even algorithm for downstream stages 
> at runtime based on statistics of the map output (this is a thing we had 
> prototyped in the Shark research project but never merged in core)
> I've attached a design doc with a proposed interface. It's not too crazy 
> because the interface between shuffles and the rest of the code is already 
> pretty narrow (just some iterators for reading data and a writer interface 
> for writing it). Bigger changes will be needed in the interaction with 
> DAGScheduler and BlockManager for some of the ideas above, but we can handle 
> those separately, and this interface will allow us to experiment with some 
> short-term stuff sooner.
> If things go well I'd also like to send a sort-based shuffle implementation 
> for 1.1, but we'll see how the timing on that works out.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2044) Pluggable interface for shuffles

2014-06-06 Thread Matei Zaharia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14020329#comment-14020329
 ] 

Matei Zaharia commented on SPARK-2044:
--

So BTW I think what I'll do is move over the current shuffle but without 
MapOutputTracker, then we can open another JIRA to move MapOutputTracker behind 
the hash shuffle implementation.

> Pluggable interface for shuffles
> 
>
> Key: SPARK-2044
> URL: https://issues.apache.org/jira/browse/SPARK-2044
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Reporter: Matei Zaharia
>Assignee: Matei Zaharia
> Attachments: Pluggableshuffleproposal.pdf
>
>
> Given that a lot of the current activity in Spark Core is in shuffles, I 
> wanted to propose factoring out shuffle implementations in a way that will 
> make experimentation easier. Ideally we will converge on one implementation, 
> but for a while, this could also be used to have several implementations 
> coexist. I'm suggesting this because I aware of at least three efforts to 
> look at shuffle (from Yahoo!, Intel and Databricks). Some of the things 
> people are investigating are:
> * Push-based shuffle where data moves directly from mappers to reducers
> * Sorting-based instead of hash-based shuffle, to create fewer files (helps a 
> lot with file handles and memory usage on large shuffles)
> * External spilling within a key
> * Changing the level of parallelism or even algorithm for downstream stages 
> at runtime based on statistics of the map output (this is a thing we had 
> prototyped in the Shark research project but never merged in core)
> I've attached a design doc with a proposed interface. It's not too crazy 
> because the interface between shuffles and the rest of the code is already 
> pretty narrow (just some iterators for reading data and a writer interface 
> for writing it). Bigger changes will be needed in the interaction with 
> DAGScheduler and BlockManager for some of the ideas above, but we can handle 
> those separately, and this interface will allow us to experiment with some 
> short-term stuff sooner.
> If things go well I'd also like to send a sort-based shuffle implementation 
> for 1.1, but we'll see how the timing on that works out.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2044) Pluggable interface for shuffles

2014-06-07 Thread Matei Zaharia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14021115#comment-14021115
 ] 

Matei Zaharia commented on SPARK-2044:
--

Alright so I've posted my code at https://github.com/apache/spark/pull/1009. 
There are still two things missing:
* Moving MapOutputTracker behind this interface
* Moving aggregation into the ShuffleReaders and ShuffleWriters instead of 
having it inside RDD operations

Maybe we can open those as separate JIRAs and more people can work on them.

> Pluggable interface for shuffles
> 
>
> Key: SPARK-2044
> URL: https://issues.apache.org/jira/browse/SPARK-2044
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Reporter: Matei Zaharia
>Assignee: Matei Zaharia
> Attachments: Pluggableshuffleproposal.pdf
>
>
> Given that a lot of the current activity in Spark Core is in shuffles, I 
> wanted to propose factoring out shuffle implementations in a way that will 
> make experimentation easier. Ideally we will converge on one implementation, 
> but for a while, this could also be used to have several implementations 
> coexist. I'm suggesting this because I aware of at least three efforts to 
> look at shuffle (from Yahoo!, Intel and Databricks). Some of the things 
> people are investigating are:
> * Push-based shuffle where data moves directly from mappers to reducers
> * Sorting-based instead of hash-based shuffle, to create fewer files (helps a 
> lot with file handles and memory usage on large shuffles)
> * External spilling within a key
> * Changing the level of parallelism or even algorithm for downstream stages 
> at runtime based on statistics of the map output (this is a thing we had 
> prototyped in the Shark research project but never merged in core)
> I've attached a design doc with a proposed interface. It's not too crazy 
> because the interface between shuffles and the rest of the code is already 
> pretty narrow (just some iterators for reading data and a writer interface 
> for writing it). Bigger changes will be needed in the interaction with 
> DAGScheduler and BlockManager for some of the ideas above, but we can handle 
> those separately, and this interface will allow us to experiment with some 
> short-term stuff sooner.
> If things go well I'd also like to send a sort-based shuffle implementation 
> for 1.1, but we'll see how the timing on that works out.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2044) Pluggable interface for shuffles

2014-06-09 Thread Weihua Jiang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14021716#comment-14021716
 ] 

Weihua Jiang commented on SPARK-2044:
-

Hi Matei,

Thanks a lot for your reply.

1. I am confused about your idea of sorting flag. 
``The goal is to allow diverse shuffle implementations, so it doesn't make 
sense to add a flag for it. If we add a flag, every ShuffleManager will need to 
implement this feature. Instead we're trying to make the smallest interface 
that the code consuming this data needs, so that we can try multiple 
implementations of ShuffleManager and see which of these features work best.
The Ordering object means that keys are comparable. This flag here would be to 
tell the ShuffleManager to sort the data, so that downstream algorithms like 
joins can work more efficiently.``
For your first statement, it seems you want to keep interface minimal, thus no 
need-to-sort flag is allowed. But for your second statement, you are allowing 
user to ask ShuffleManager to perform sort for the data. 
>From my point of view, it is better to have such a flag to allow user to ask 
>ShuffleManager to perform sort. Thus, operation like SQL "order by" can be 
>implemented more efficiently. ShuffleManager can provide some utility class to 
>perform general sorting so that not every implementation needs to implement 
>its own sorting logic.

2. I agree that, for ShuffleReader, read a partition range is more efficient. 
However, if we want to break the barrier between map and reduce stage, we will 
encounter a situation that, when a reducer starts, not all its partitions are 
ready. If using partition range, reducer will wait for all partitions to be 
ready before executing reducer. It is better if reducer can start execution 
when some (not all) partitions are ready. The POC code can be found at 
https://github.com/lirui-intel/spark/tree/removeStageBarrier. This is why I 
think we need another read() function to specify a partition list instead of a 
range.  

> Pluggable interface for shuffles
> 
>
> Key: SPARK-2044
> URL: https://issues.apache.org/jira/browse/SPARK-2044
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Reporter: Matei Zaharia
>Assignee: Matei Zaharia
> Attachments: Pluggableshuffleproposal.pdf
>
>
> Given that a lot of the current activity in Spark Core is in shuffles, I 
> wanted to propose factoring out shuffle implementations in a way that will 
> make experimentation easier. Ideally we will converge on one implementation, 
> but for a while, this could also be used to have several implementations 
> coexist. I'm suggesting this because I aware of at least three efforts to 
> look at shuffle (from Yahoo!, Intel and Databricks). Some of the things 
> people are investigating are:
> * Push-based shuffle where data moves directly from mappers to reducers
> * Sorting-based instead of hash-based shuffle, to create fewer files (helps a 
> lot with file handles and memory usage on large shuffles)
> * External spilling within a key
> * Changing the level of parallelism or even algorithm for downstream stages 
> at runtime based on statistics of the map output (this is a thing we had 
> prototyped in the Shark research project but never merged in core)
> I've attached a design doc with a proposed interface. It's not too crazy 
> because the interface between shuffles and the rest of the code is already 
> pretty narrow (just some iterators for reading data and a writer interface 
> for writing it). Bigger changes will be needed in the interaction with 
> DAGScheduler and BlockManager for some of the ideas above, but we can handle 
> those separately, and this interface will allow us to experiment with some 
> short-term stuff sooner.
> If things go well I'd also like to send a sort-based shuffle implementation 
> for 1.1, but we'll see how the timing on that works out.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2044) Pluggable interface for shuffles

2014-06-09 Thread Weihua Jiang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14021714#comment-14021714
 ] 

Weihua Jiang commented on SPARK-2044:
-

Hi Matei,

Thanks a lot for your reply.

1. I am confused about your idea of sorting flag. 
``The goal is to allow diverse shuffle implementations, so it doesn't make 
sense to add a flag for it. If we add a flag, every ShuffleManager will need to 
implement this feature. Instead we're trying to make the smallest interface 
that the code consuming this data needs, so that we can try multiple 
implementations of ShuffleManager and see which of these features work best.
The Ordering object means that keys are comparable. This flag here would be to 
tell the ShuffleManager to sort the data, so that downstream algorithms like 
joins can work more efficiently.``
For your first statement, it seems you want to keep interface minimal, thus no 
need-to-sort flag is allowed. But for your second statement, you are allowing 
user to ask ShuffleManager to perform sort for the data. 
>From my point of view, it is better to have such a flag to allow user to ask 
>ShuffleManager to perform sort. Thus, operation like SQL "order by" can be 
>implemented more efficiently. ShuffleManager can provide some utility class to 
>perform general sorting so that not every implementation needs to implement 
>its own sorting logic.

2. I agree that, for ShuffleReader, read a partition range is more efficient. 
However, if we want to break the barrier between map and reduce stage, we will 
encounter a situation that, when a reducer starts, not all its partitions are 
ready. If using partition range, reducer will wait for all partitions to be 
ready before executing reducer. It is better if reducer can start execution 
when some (not all) partitions are ready. The POC code can be found at 
https://github.com/lirui-intel/spark/tree/removeStageBarrier. This is why I 
think we need another read() function to specify a partition list instead of a 
range.  

> Pluggable interface for shuffles
> 
>
> Key: SPARK-2044
> URL: https://issues.apache.org/jira/browse/SPARK-2044
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Reporter: Matei Zaharia
>Assignee: Matei Zaharia
> Attachments: Pluggableshuffleproposal.pdf
>
>
> Given that a lot of the current activity in Spark Core is in shuffles, I 
> wanted to propose factoring out shuffle implementations in a way that will 
> make experimentation easier. Ideally we will converge on one implementation, 
> but for a while, this could also be used to have several implementations 
> coexist. I'm suggesting this because I aware of at least three efforts to 
> look at shuffle (from Yahoo!, Intel and Databricks). Some of the things 
> people are investigating are:
> * Push-based shuffle where data moves directly from mappers to reducers
> * Sorting-based instead of hash-based shuffle, to create fewer files (helps a 
> lot with file handles and memory usage on large shuffles)
> * External spilling within a key
> * Changing the level of parallelism or even algorithm for downstream stages 
> at runtime based on statistics of the map output (this is a thing we had 
> prototyped in the Shark research project but never merged in core)
> I've attached a design doc with a proposed interface. It's not too crazy 
> because the interface between shuffles and the rest of the code is already 
> pretty narrow (just some iterators for reading data and a writer interface 
> for writing it). Bigger changes will be needed in the interaction with 
> DAGScheduler and BlockManager for some of the ideas above, but we can handle 
> those separately, and this interface will allow us to experiment with some 
> short-term stuff sooner.
> If things go well I'd also like to send a sort-based shuffle implementation 
> for 1.1, but we'll see how the timing on that works out.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2044) Pluggable interface for shuffles

2014-06-09 Thread Matei Zaharia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14025580#comment-14025580
 ] 

Matei Zaharia commented on SPARK-2044:
--

Hey Weihua,

I'll look into the sorting flag; I initially envisioned that the shuffle 
manager would just tell the calling code whether the data is sorted (otherwise 
it sorts it by itself), but maybe it does make sense to push sorting into the 
interface.

For the ranges on ShuffleReader, I think you misunderstood my meaning slightly. 
I don't *want* the reduction code (e.g. combineByKey or groupByKey) to even 
know that map tasks are running at different times. It should simply request 
its range of reduce partitions once, and then the shuffle *implementation* 
should see which maps are ready and start pulling from those. Note also that 
the partition range there is for reduce partitions (e.g. our job has 100 reduce 
partitions and we ask for partitions 2-5 because we decided to have just one 
reduce task for those). It's not for map IDs.

> Pluggable interface for shuffles
> 
>
> Key: SPARK-2044
> URL: https://issues.apache.org/jira/browse/SPARK-2044
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Reporter: Matei Zaharia
>Assignee: Matei Zaharia
> Attachments: Pluggableshuffleproposal.pdf
>
>
> Given that a lot of the current activity in Spark Core is in shuffles, I 
> wanted to propose factoring out shuffle implementations in a way that will 
> make experimentation easier. Ideally we will converge on one implementation, 
> but for a while, this could also be used to have several implementations 
> coexist. I'm suggesting this because I aware of at least three efforts to 
> look at shuffle (from Yahoo!, Intel and Databricks). Some of the things 
> people are investigating are:
> * Push-based shuffle where data moves directly from mappers to reducers
> * Sorting-based instead of hash-based shuffle, to create fewer files (helps a 
> lot with file handles and memory usage on large shuffles)
> * External spilling within a key
> * Changing the level of parallelism or even algorithm for downstream stages 
> at runtime based on statistics of the map output (this is a thing we had 
> prototyped in the Shark research project but never merged in core)
> I've attached a design doc with a proposed interface. It's not too crazy 
> because the interface between shuffles and the rest of the code is already 
> pretty narrow (just some iterators for reading data and a writer interface 
> for writing it). Bigger changes will be needed in the interaction with 
> DAGScheduler and BlockManager for some of the ideas above, but we can handle 
> those separately, and this interface will allow us to experiment with some 
> short-term stuff sooner.
> If things go well I'd also like to send a sort-based shuffle implementation 
> for 1.1, but we'll see how the timing on that works out.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2044) Pluggable interface for shuffles

2014-06-09 Thread Weihua Jiang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14026109#comment-14026109
 ] 

Weihua Jiang commented on SPARK-2044:
-

Hi Matei,

Thanks for the reply. 

I am glad that you thinking pushing sorting into the interface is useful.

Yes, you are right. I misunderstand the partition id and map id. For partition 
id range, I am totally OK with it.

> Pluggable interface for shuffles
> 
>
> Key: SPARK-2044
> URL: https://issues.apache.org/jira/browse/SPARK-2044
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Reporter: Matei Zaharia
>Assignee: Matei Zaharia
> Attachments: Pluggableshuffleproposal.pdf
>
>
> Given that a lot of the current activity in Spark Core is in shuffles, I 
> wanted to propose factoring out shuffle implementations in a way that will 
> make experimentation easier. Ideally we will converge on one implementation, 
> but for a while, this could also be used to have several implementations 
> coexist. I'm suggesting this because I aware of at least three efforts to 
> look at shuffle (from Yahoo!, Intel and Databricks). Some of the things 
> people are investigating are:
> * Push-based shuffle where data moves directly from mappers to reducers
> * Sorting-based instead of hash-based shuffle, to create fewer files (helps a 
> lot with file handles and memory usage on large shuffles)
> * External spilling within a key
> * Changing the level of parallelism or even algorithm for downstream stages 
> at runtime based on statistics of the map output (this is a thing we had 
> prototyped in the Shark research project but never merged in core)
> I've attached a design doc with a proposed interface. It's not too crazy 
> because the interface between shuffles and the rest of the code is already 
> pretty narrow (just some iterators for reading data and a writer interface 
> for writing it). Bigger changes will be needed in the interaction with 
> DAGScheduler and BlockManager for some of the ideas above, but we can handle 
> those separately, and this interface will allow us to experiment with some 
> short-term stuff sooner.
> If things go well I'd also like to send a sort-based shuffle implementation 
> for 1.1, but we'll see how the timing on that works out.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2044) Pluggable interface for shuffles

2014-06-26 Thread Raymond Liu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1402#comment-1402
 ] 

Raymond Liu commented on SPARK-2044:


Hi [~matei]

I am wondering maybe we should hide the shuffleBlockManager behind 
ShuffleManager for better code decoupling of diskBlockManager / BlockManager 
and ShuffleBlockManager. And it will also helps when someone try to write a new 
shuffleManager which have it's own shuffle block management strategy. e.g. Sort 
Based shuffle Manager.

I have fill in the jira ticket at :  
https://issues.apache.org/jira/browse/SPARK-2288, would you mind to take a look 
upon it? 


> Pluggable interface for shuffles
> 
>
> Key: SPARK-2044
> URL: https://issues.apache.org/jira/browse/SPARK-2044
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Reporter: Matei Zaharia
>Assignee: Matei Zaharia
> Attachments: Pluggableshuffleproposal.pdf
>
>
> Given that a lot of the current activity in Spark Core is in shuffles, I 
> wanted to propose factoring out shuffle implementations in a way that will 
> make experimentation easier. Ideally we will converge on one implementation, 
> but for a while, this could also be used to have several implementations 
> coexist. I'm suggesting this because I aware of at least three efforts to 
> look at shuffle (from Yahoo!, Intel and Databricks). Some of the things 
> people are investigating are:
> * Push-based shuffle where data moves directly from mappers to reducers
> * Sorting-based instead of hash-based shuffle, to create fewer files (helps a 
> lot with file handles and memory usage on large shuffles)
> * External spilling within a key
> * Changing the level of parallelism or even algorithm for downstream stages 
> at runtime based on statistics of the map output (this is a thing we had 
> prototyped in the Shark research project but never merged in core)
> I've attached a design doc with a proposed interface. It's not too crazy 
> because the interface between shuffles and the rest of the code is already 
> pretty narrow (just some iterators for reading data and a writer interface 
> for writing it). Bigger changes will be needed in the interaction with 
> DAGScheduler and BlockManager for some of the ideas above, but we can handle 
> those separately, and this interface will allow us to experiment with some 
> short-term stuff sooner.
> If things go well I'd also like to send a sort-based shuffle implementation 
> for 1.1, but we'll see how the timing on that works out.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2044) Pluggable interface for shuffles

2014-06-26 Thread Raymond Liu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14045447#comment-14045447
 ] 

Raymond Liu commented on SPARK-2044:


Hi [~matei], also the pull request for above jira at 
https://github.com/apache/spark/pull/1241, would you mind to take a look upon 
it?

> Pluggable interface for shuffles
> 
>
> Key: SPARK-2044
> URL: https://issues.apache.org/jira/browse/SPARK-2044
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Reporter: Matei Zaharia
>Assignee: Matei Zaharia
> Attachments: Pluggableshuffleproposal.pdf
>
>
> Given that a lot of the current activity in Spark Core is in shuffles, I 
> wanted to propose factoring out shuffle implementations in a way that will 
> make experimentation easier. Ideally we will converge on one implementation, 
> but for a while, this could also be used to have several implementations 
> coexist. I'm suggesting this because I aware of at least three efforts to 
> look at shuffle (from Yahoo!, Intel and Databricks). Some of the things 
> people are investigating are:
> * Push-based shuffle where data moves directly from mappers to reducers
> * Sorting-based instead of hash-based shuffle, to create fewer files (helps a 
> lot with file handles and memory usage on large shuffles)
> * External spilling within a key
> * Changing the level of parallelism or even algorithm for downstream stages 
> at runtime based on statistics of the map output (this is a thing we had 
> prototyped in the Shark research project but never merged in core)
> I've attached a design doc with a proposed interface. It's not too crazy 
> because the interface between shuffles and the rest of the code is already 
> pretty narrow (just some iterators for reading data and a writer interface 
> for writing it). Bigger changes will be needed in the interaction with 
> DAGScheduler and BlockManager for some of the ideas above, but we can handle 
> those separately, and this interface will allow us to experiment with some 
> short-term stuff sooner.
> If things go well I'd also like to send a sort-based shuffle implementation 
> for 1.1, but we'll see how the timing on that works out.



--
This message was sent by Atlassian JIRA
(v6.2#6252)