[jira] [Commented] (SPARK-1239) Don't fetch all map output statuses at each reducer during shuffles

2016-03-21 Thread Thomas Graves (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15205279#comment-15205279
 ] 

Thomas Graves commented on SPARK-1239:
--

I do like the idea of broadcast and originally when I had tried it I had the 
issue mentioned in the second bullet point, but as long as we are synchronizing 
on the requests so we only broadcast it once we should be ok.
It does seem to have some further constraints though too.  With a sufficient 
large job I don't think it matters but what if we only have a small number of 
reducers, we broadcast it to all executors when only a couple need it. I guess 
that doesn't hurt much unless the other executors start going to the executors 
your reducers are on and add more load to them.  Should be pretty minimal 
though.
Broadcast also seems to make less sense when using the dynamic allocation.  At 
least I've seen issues when executors go away, it fails fetch from that one, 
has to retry, etc, adding additional time.  We recently specifically fixed one 
issue with this to make it go get locations again after certain number of 
failures.  That time should be less now that we fixed that but I'll have to run 
the numbers.

I'll do some more analysis/testing of this and see if that really matters.  

with a sufficient number of threads I don't think a few slow nodes would make 
much of a difference here, if you have that many slow nodes the shuffle itself 
is going to be impacted which I would see as a larger affect. The slow nodes 
could just as well affect the broadcast as well.  Hopefully you skip those as 
it takes longer for those to get a chunk, buts its possible that once that slow 
one has a chunk or two, more and more executors start going to that one for the 
broadcast data instead of the driver thus slowing down more transfers.

But its a good point and my current method would truly block (for a certain 
time) rather then being slow.  Note that there is a timeout on waiting for the 
send to happen and when it does it closes the connection and executor would 
retry.  You don't have to worry about that with broadcast.

I'll do some more analysis with that approach.

I wish Netty had some other built in mechanisms for flow control.

> Don't fetch all map output statuses at each reducer during shuffles
> ---
>
> Key: SPARK-1239
> URL: https://issues.apache.org/jira/browse/SPARK-1239
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 1.0.2, 1.1.0
>Reporter: Patrick Wendell
>Assignee: Thomas Graves
>
> Instead we should modify the way we fetch map output statuses to take both a 
> mapper and a reducer - or we should just piggyback the statuses on each task. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-1239) Don't fetch all map output statuses at each reducer during shuffles

2016-03-21 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15204939#comment-15204939
 ] 

Mridul Muralidharan commented on SPARK-1239:


[~tgraves] For the last part (waiting bit) - why not make the threshold where 
you use Broadcast instead of direct serialization such that the problem 'goes 
away' ? For my case, I was using a fairly high number, but nothing stopping us 
from using say 1mb - which means number of outstanding requests which will 
cause memory issue becomes extremely high to the point of being not possible 
practically.
In general, I dont like the point about waiting for IO to complete - different 
nodes might have different loads, which can cause driver not to respond to fast 
nodes because slow nodes cause the response not to be sent (over time).

> Don't fetch all map output statuses at each reducer during shuffles
> ---
>
> Key: SPARK-1239
> URL: https://issues.apache.org/jira/browse/SPARK-1239
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 1.0.2, 1.1.0
>Reporter: Patrick Wendell
>Assignee: Thomas Graves
>
> Instead we should modify the way we fetch map output statuses to take both a 
> mapper and a reducer - or we should just piggyback the statuses on each task. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-1239) Don't fetch all map output statuses at each reducer during shuffles

2016-03-21 Thread Thomas Graves (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15204900#comment-15204900
 ] 

Thomas Graves commented on SPARK-1239:
--

So I have been looking at this and testing a few changes out.

There are a few issues here but if we are looking at solving the driver memory 
bloat issue then I think this comes down to flow control issue.  The Driver is 
trying to respond to all the map status requests and is shoving them out to 
Netty quicker then netty can send them and we end up using a lot of memory very 
quickly.  Yes you can try to reduce the size of the MapStatuses but you can 
only do that to a point and you could still have the this issue.

There are multiple possible ways to solve this.  The approach I have been 
looking at is having the MapOutputTracker have its own queue and thread pool 
for handling requests.  This gives us the flexibility to do multiple things:

- We can make the reply synchronous (ie it waits for response from netty to 
start next reply) without blocking the normal dispatcher threads which do 
things like handling heartbeats, thus giving us flow control. We can decide to 
do this only if the map output status are above a certain size or do it all the 
time.  You can adjust the thread pool size to handle more in parallel.  you 
could make this more sophisticated in the future if we want to have some sort 
of send queue rather then blocking each thread.  
- We can easily synchronize incoming requests without blocking dispatcher 
threads so we don't serialize the same MapStatus multiple times.  Background - 
one other problem I've been seeing is that you get a bunch of requests for map 
status in at once, we have a lot of dispatchers threads running in parallel, 
all of those do the check to see if the map status is cached, all of them 
report its not, and you have multiple threads all serializing the exact same 
map output statuses.
- doesn't limit us with sending map status with Task data.  ie if we want to 
change Spark in the future to start Reducer tasks before all map tasks finish 
(MapReduce does this now) this more easily works with that.  
 
I still need to do some more testing on this but I wanted to see what people 
thought of this approach?

What I have implemented right now is a queue and threadpool in the 
MapOutputTracker to handle the requests.  if its over 5MB (still deciding on 
this size) then when it replies it waits for it to actually send before 
grabbing the next request.
For the second bullet above I did a somewhat simpler approach for now and when 
registerMapOutputs is called I have it cache the map status output then instead 
of waiting for a request to come in. This helps as it will make sure the last 
one is cached but if you have multiple then the others still won't be in the 
cache.  We could either have it cache more or take an approach like I mention 
above to have it just synchronize and cache one upon the first request.

One of the large jobs I'm using to test this is shuffling 15TB of data using 
202000 map tasks going down to 500 reducers.  The driver originally was using 
20GB of memory, with my changes I was able to successfully run it with 5GB.

> Don't fetch all map output statuses at each reducer during shuffles
> ---
>
> Key: SPARK-1239
> URL: https://issues.apache.org/jira/browse/SPARK-1239
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 1.0.2, 1.1.0
>Reporter: Patrick Wendell
>Assignee: Thomas Graves
>
> Instead we should modify the way we fetch map output statuses to take both a 
> mapper and a reducer - or we should just piggyback the statuses on each task. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-1239) Don't fetch all map output statuses at each reducer during shuffles

2016-02-03 Thread Daniel Darabos (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15130351#comment-15130351
 ] 

Daniel Darabos commented on SPARK-1239:
---

I've read an interesting article about the "Kylix" butterfly allreduce 
(http://www.cs.berkeley.edu/~jfc/papers/14/Kylix.pdf). I think this is a direct 
solution to this problem and the authors say integration with Spark should be 
"easy".

Perhaps the same approach could be simulated within the current Spark shuffle 
implementation. I think the idea is to break up the M*R shuffle into an M*K and 
a K*R shuffle, where K is much less then M or R. So those K partitions will be 
large, but that should be fine.

> Don't fetch all map output statuses at each reducer during shuffles
> ---
>
> Key: SPARK-1239
> URL: https://issues.apache.org/jira/browse/SPARK-1239
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 1.0.2, 1.1.0
>Reporter: Patrick Wendell
>Assignee: Thomas Graves
>
> Instead we should modify the way we fetch map output statuses to take both a 
> mapper and a reducer - or we should just piggyback the statuses on each task. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-1239) Don't fetch all map output statuses at each reducer during shuffles

2015-12-02 Thread Thomas Graves (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15036341#comment-15036341
 ] 

Thomas Graves commented on SPARK-1239:
--

I have another user hitting this also.  The above mentions other issues that 
need to be addressed in MapOutputStatusTracker do you have links to those other 
issues?

> Don't fetch all map output statuses at each reducer during shuffles
> ---
>
> Key: SPARK-1239
> URL: https://issues.apache.org/jira/browse/SPARK-1239
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 1.0.2, 1.1.0
>Reporter: Patrick Wendell
>
> Instead we should modify the way we fetch map output statuses to take both a 
> mapper and a reducer - or we should just piggyback the statuses on each task. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-1239) Don't fetch all map output statuses at each reducer during shuffles

2015-11-04 Thread Alex Slusarenko (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14989638#comment-14989638
 ] 

Alex Slusarenko commented on SPARK-1239:


> How is this not a major issue for everyone?
Daniel, I had exactly the same thoughts, but I found out that most of Spark 
users have small data. Most users need about 10 servers. I was even more 
surprised when I had to fix SPARK-6246 myself. Spark was not able to launch 
more than 100 instances on Amazon.

> Don't fetch all map output statuses at each reducer during shuffles
> ---
>
> Key: SPARK-1239
> URL: https://issues.apache.org/jira/browse/SPARK-1239
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 1.0.2, 1.1.0
>Reporter: Patrick Wendell
>
> Instead we should modify the way we fetch map output statuses to take both a 
> mapper and a reducer - or we should just piggyback the statuses on each task. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-1239) Don't fetch all map output statuses at each reducer during shuffles

2015-11-03 Thread Daniel Darabos (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14987046#comment-14987046
 ] 

Daniel Darabos commented on SPARK-1239:
---

I can also add some data. I have a ShuffleMapStage with 82,714 tasks and then a 
ResultStage with 222,609 tasks. The driver cannot serialize this:

{noformat}
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
at java.util.Arrays.copyOf(Arrays.java:2271) ~[na:1.7.0_79]
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) 
~[na:1.7.0_79]
at 
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) 
~[na:1.7.0_79]
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) 
~[na:1.7.0_79]
at 
java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253) 
~[na:1.7.0_79]
at 
java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211) 
~[na:1.7.0_79]
at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:146) 
~[na:1.7.0_79]
at 
java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1893)
 ~[na:1.7.0_79]
at 
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1874)
 ~[na:1.7.0_79]
at 
java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1821)
 ~[na:1.7.0_79]
at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:718) 
~[na:1.7.0_79]
at java.io.ObjectOutputStream.close(ObjectOutputStream.java:739) 
~[na:1.7.0_79]
at 
org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$2.apply$mcV$sp(MapOutputTracker.scala:362)
 ~[spark-assembly-1.4.0-hadoop2.4.0.jar:1.4.0]
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1294) 
~[spark-assembly-1.4.0-hadoop2.4.0.jar:1.4.0]
at 
org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:361)
 ~[spark-assembly-1.4.0-hadoop2.4.0.jar:1.4.0]
at 
org.apache.spark.MapOutputTrackerMaster.getSerializedMapOutputStatuses(MapOutputTracker.scala:312)
 ~[spark-assembly-1.4.0-hadoop2.4.0.jar:1.4.0]
at 
org.apache.spark.MapOutputTrackerMasterEndpoint$$anonfun$receiveAndReply$1.applyOrElse(MapOutputTracker.scala:49)
 ~[spark-assembly-1.4.0-hadoop2.4.0.jar:1.4.0]
{noformat}

I see {{getSerializedMapOutputStatuses}} has changed a lot since 1.4.0 but it 
still returns an array sized proportional to _M * R_. How can this be part of a 
scalable system? How is this not a major issue for everyone? Am I doing 
something wrong?

I'm now thinking that maybe if you have an overwhelming majority of empty or 
non-empty blocks, the bitmap will compress very well. But it's possible that I 
am ending up with a relatively even mix of empty and non-empty blocks, killing 
the compression. I have about 40 billion lines, _M * R_ is about 20 billion, so 
this seems plausible.

It's also possible that I should have larger partitions. Due to the processing 
I do it's not possible -- it leads to the executors OOMing. But larger 
partitions would not be a scalable solution anyway. If _M_ and _R_ are 
reasonable now with some number of lines per partition, then when your data 
size doubles they will also double and _M * R_ will quadruple. At some point 
the number of lines per map output will be low enough that compression becomes 
ineffective.

I see https://issues.apache.org/jira/browse/SPARK-11271 has recently decreased 
the map status size by 20%. That means in Spark 1.6 I will be able to process 
1/sqrt(0.8) or 12% more data than now. The way I understand the situation the 
improvement required is orders of magnitude larger than that. I'm currently 
hitting this issue with 5 TB of input. If I tried processing 5 PB, the map 
status would be a million times larger.

I like the premise of this JIRA ticket of not building the map status table in 
the first place. But a colleague of mine asks if perhaps we could even avoid 
tracking this data in the driver. If the driver just provided the reducers with 
the list of mappers they could each just ask the mappers directly for the list 
of blocks they should fetch.

> Don't fetch all map output statuses at each reducer during shuffles
> ---
>
> Key: SPARK-1239
> URL: https://issues.apache.org/jira/browse/SPARK-1239
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 1.0.2, 1.1.0
>Reporter: Patrick Wendell
>
> Instead we should modify the way we fetch map output statuses to take both a 
> mapper and a reducer - or we should just piggyback the statuses on each task. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---

[jira] [Commented] (SPARK-1239) Don't fetch all map output statuses at each reducer during shuffles

2015-07-29 Thread Alex Slusarenko (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14645913#comment-14645913
 ] 

Alex Slusarenko commented on SPARK-1239:


Hi, all. We have faced this issue many times. Currently, we have 250 000 map 
tasks and the same amount of reduce tasks. We have 200 slave nodes. The driver 
has 80 G RAM. First we observed akka frame size limit exception and after 
increasing the limit we see OOM. Here is the corresponding part of the log:
{noformat}
...
15/07/27 17:22:56 INFO TaskSchedulerImpl: Adding task set 1.0 with 262144 tasks
15/07/27 17:22:57 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 
233766, 10.47.190.240, PROCESS_LOCAL, 1215 bytes)
15/07/27 17:22:57 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 
233767, 10.145.26.133, PROCESS_LOCAL, 1215 bytes)
15/07/27 17:22:57 INFO TaskSetManager: Starting task 2.0 in stage 1.0 (TID 
233768, 10.51.191.206, PROCESS_LOCAL, 1215 bytes)
...
15/07/27 17:22:57 INFO TaskSetManager: Starting task 3197.0 in stage 1.0 (TID 
236963, 10.99.197.178, PROCESS_LOCAL, 1215 bytes)
15/07/27 17:22:57 INFO TaskSetManager: Starting task 3198.0 in stage 1.0 (TID 
236964, 10.65.148.16, PROCESS_LOCAL, 1215 bytes)
15/07/27 17:22:57 INFO TaskSetManager: Starting task 3199.0 in stage 1.0 (TID 
236965, 10.123.204.224, PROCESS_LOCAL, 1215 bytes)
15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 
10.145.30.250:38441 (size: 3.8 KB, free: 4.1 GB)
15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 
10.140.170.222:35810 (size: 3.8 KB, free: 4.1 GB)
15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 
10.7.205.149:43761 (size: 3.8 KB, free: 4.1 GB)
...
5/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 
10.165.146.7:37388 (size: 3.8 KB, free: 4.1 GB)
15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 
10.153.254.79:49517 (size: 3.8 KB, free: 4.1 GB)
15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 
10.95.198.154:53675 (size: 3.8 KB, free: 4.1 GB)
15/07/27 17:24:41 INFO MapOutputTrackerMaster: Size of output statuses for 
shuffle 0 is 166509346 bytes
15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output 
locations for shuffle 0 to 10.109.157.235:39740
15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output 
locations for shuffle 0 to 10.166.156.78:59382
15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output 
locations for shuffle 0 to 10.152.41.131:47968
...
15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output 
locations for shuffle 0 to 10.140.253.251:44621
15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output 
locations for shuffle 0 to 10.153.254.79:42648
15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output 
locations for shuffle 0 to 10.169.230.246:45473
15/07/27 17:25:31 INFO BlockManagerInfo: Removed broadcast_1_piece0 on 
10.146.43.5:49989 in memory (size: 3.2 KB, free: 3.4 GB)
15/07/27 17:27:25 ERROR ActorSystemImpl: Uncaught fatal error from thread 
[sparkDriver-akka.remote.default-remote-dispatcher-47] shutting down 
ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3236)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
at 
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at 
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at 
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at 
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129)
at 
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
at 
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
at 
akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:845)
at 
akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:845)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:844)
at akka.remote.EndpointWriter.writeSend(End

[jira] [Commented] (SPARK-1239) Don't fetch all map output statuses at each reducer during shuffles

2015-03-08 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14352377#comment-14352377
 ] 

Mridul Muralidharan commented on SPARK-1239:


Hitting akka framesize for map outputtracker is very easy since we fetch whole 
output (m * r) - while I cant get into specifics of our jobs or share logs; but 
it is easy to see this hitting 1G for 100k mappers and 50k reducers.
If this is not being looked into currently, I can add it to my list of things 
to fix - but if there is already work being done, I dont want to duplicate it.

Even something trivial like what was done in task result would suffice (if we 
dont want the additional overhead of per per reduce map output generation at 
master).

> Don't fetch all map output statuses at each reducer during shuffles
> ---
>
> Key: SPARK-1239
> URL: https://issues.apache.org/jira/browse/SPARK-1239
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 1.0.2, 1.1.0
>Reporter: Patrick Wendell
>
> Instead we should modify the way we fetch map output statuses to take both a 
> mapper and a reducer - or we should just piggyback the statuses on each task. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-1239) Don't fetch all map output statuses at each reducer during shuffles

2015-03-08 Thread Patrick Wendell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14352356#comment-14352356
 ] 

Patrick Wendell commented on SPARK-1239:


It would be helpful if any users who have observed this could comment on the 
JIRA and give workload information. This has been more on the back burner since 
we've heard few reports of it on the mailing list, etc...

> Don't fetch all map output statuses at each reducer during shuffles
> ---
>
> Key: SPARK-1239
> URL: https://issues.apache.org/jira/browse/SPARK-1239
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 1.0.2, 1.1.0
>Reporter: Patrick Wendell
>
> Instead we should modify the way we fetch map output statuses to take both a 
> mapper and a reducer - or we should just piggyback the statuses on each task. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-1239) Don't fetch all map output statuses at each reducer during shuffles

2015-03-08 Thread Kostas Sakellis (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14352306#comment-14352306
 ] 

Kostas Sakellis commented on SPARK-1239:


How many reduce side tasks do you have? Can you please attach your your logs 
that show the OOM errors/

> Don't fetch all map output statuses at each reducer during shuffles
> ---
>
> Key: SPARK-1239
> URL: https://issues.apache.org/jira/browse/SPARK-1239
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 1.0.2, 1.1.0
>Reporter: Patrick Wendell
>
> Instead we should modify the way we fetch map output statuses to take both a 
> mapper and a reducer - or we should just piggyback the statuses on each task. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-1239) Don't fetch all map output statuses at each reducer during shuffles

2015-03-08 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14352284#comment-14352284
 ] 

Mridul Muralidharan commented on SPARK-1239:


[~pwendell] Is there any update on this ? This is fairly commonly hitting us, 
and we are at 1Gig for framesize already now ...


> Don't fetch all map output statuses at each reducer during shuffles
> ---
>
> Key: SPARK-1239
> URL: https://issues.apache.org/jira/browse/SPARK-1239
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 1.0.2, 1.1.0
>Reporter: Patrick Wendell
>
> Instead we should modify the way we fetch map output statuses to take both a 
> mapper and a reducer - or we should just piggyback the statuses on each task. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-1239) Don't fetch all map output statuses at each reducer during shuffles

2014-10-23 Thread Patrick Wendell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14182122#comment-14182122
 ] 

Patrick Wendell commented on SPARK-1239:


Hey Kostas - there are a few other bugs that required a refactoring to solve 
them that will subsume this issue, so I think that's why [~joshrosen] is 
grabbing it. This change will involve fairly significant surgery to a very 
complex part of Spark, so it might not be the best beginner issue. If Josh's 
other changes don't end up fixing this, we can leave this open as an 
independent issue and see if it can be done surgically.

> Don't fetch all map output statuses at each reducer during shuffles
> ---
>
> Key: SPARK-1239
> URL: https://issues.apache.org/jira/browse/SPARK-1239
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 1.0.2, 1.1.0
>Reporter: Patrick Wendell
>Assignee: Josh Rosen
>
> Instead we should modify the way we fetch map output statuses to take both a 
> mapper and a reducer - or we should just piggyback the statuses on each task. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-1239) Don't fetch all map output statuses at each reducer during shuffles

2014-10-23 Thread Kostas Sakellis (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14182110#comment-14182110
 ] 

Kostas Sakellis commented on SPARK-1239:


Apologies for not commenting on this JIRA sooner but since I'm new to Spark, it 
has taken a bit of time to wrap my head around how the different schedulers 
(DAG and Task) interoperate. I'm currently investigating completely removing 
the MapOutputTracker class. Instead, the DAGScheduler can push down the 
required map status' to the next stage. I'm thinking of modifying the 
DAGScheduler.submitMissingTasks(..) to take in (or query) the mapStatus' of the 
previous completed stage. When a new ShuffleMapTask gets created, we pass the 
filtered mapStatus' necessary for that task. The map status data can be stored 
in the TaskContext and used in the BlockStoreShuffleFetcher when block data is 
being read (currently uses the MapOutputTracker). What I'm investigating 
currently is how to filter the data when I create the ShuffleMapTask - the 
BlockStoreShuffleFetcher uses a shuffleId which I don't seem to have access to 
in the DAGScheduler.

I haven't yet written any code to test this out so if anyone has any concerns 
please let me know. Also, [~joshrosen] pointed out that the size of the map 
output structure even after it has been filtered could still be very large. 
This is worth investigating.

> Don't fetch all map output statuses at each reducer during shuffles
> ---
>
> Key: SPARK-1239
> URL: https://issues.apache.org/jira/browse/SPARK-1239
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 1.0.2, 1.1.0
>Reporter: Patrick Wendell
>Assignee: Josh Rosen
>
> Instead we should modify the way we fetch map output statuses to take both a 
> mapper and a reducer - or we should just piggyback the statuses on each task. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-1239) Don't fetch all map output statuses at each reducer during shuffles

2014-10-08 Thread DB Tsai (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14163423#comment-14163423
 ] 

DB Tsai commented on SPARK-1239:


+1, we run into this issue as well.

> Don't fetch all map output statuses at each reducer during shuffles
> ---
>
> Key: SPARK-1239
> URL: https://issues.apache.org/jira/browse/SPARK-1239
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 1.0.2, 1.1.0
>Reporter: Patrick Wendell
>Assignee: Kostas Sakellis
>
> Instead we should modify the way we fetch map output statuses to take both a 
> mapper and a reducer - or we should just piggyback the statuses on each task. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-1239) Don't fetch all map output statuses at each reducer during shuffles

2014-09-15 Thread Patrick Wendell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14134198#comment-14134198
 ] 

Patrick Wendell commented on SPARK-1239:


Yes, the current state of the art is to just increase the frame size.

> Don't fetch all map output statuses at each reducer during shuffles
> ---
>
> Key: SPARK-1239
> URL: https://issues.apache.org/jira/browse/SPARK-1239
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 1.0.2, 1.1.0
>Reporter: Patrick Wendell
>Assignee: Kostas Sakellis
>
> Instead we should modify the way we fetch map output statuses to take both a 
> mapper and a reducer - or we should just piggyback the statuses on each task. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-1239) Don't fetch all map output statuses at each reducer during shuffles

2014-09-15 Thread Andrew Ash (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14134160#comment-14134160
 ] 

Andrew Ash commented on SPARK-1239:
---

For large statuses, would we expect that to exceed {{spark.akka.frameSize}} and 
cause the below exception?

{noformat}
2014-09-14T01:34:21.305 ERROR [spark-akka.actor.default-dispatcher-4] 
org.apache.spark.MapOutputTrackerMasterActor - Map output statuses were 
13920119 bytes which exceeds spark.akka.frameSize (10485760 bytes).
{noformat}

> Don't fetch all map output statuses at each reducer during shuffles
> ---
>
> Key: SPARK-1239
> URL: https://issues.apache.org/jira/browse/SPARK-1239
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 1.0.2, 1.1.0
>Reporter: Patrick Wendell
>Assignee: Kostas Sakellis
>
> Instead we should modify the way we fetch map output statuses to take both a 
> mapper and a reducer - or we should just piggyback the statuses on each task. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-1239) Don't fetch all map output statuses at each reducer during shuffles

2014-09-08 Thread Andrew Or (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14126215#comment-14126215
 ] 

Andrew Or commented on SPARK-1239:
--

I have reassigned it to you Kostas.

> Don't fetch all map output statuses at each reducer during shuffles
> ---
>
> Key: SPARK-1239
> URL: https://issues.apache.org/jira/browse/SPARK-1239
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Reporter: Patrick Wendell
>Assignee: Kostas Sakellis
> Fix For: 1.1.0
>
>
> Instead we should modify the way we fetch map output statuses to take both a 
> mapper and a reducer - or we should just piggyback the statuses on each task. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-1239) Don't fetch all map output statuses at each reducer during shuffles

2014-09-05 Thread Kostas Sakellis (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14123883#comment-14123883
 ] 

Kostas Sakellis commented on SPARK-1239:


[~pwendell] I'd like to take a crack at this since it is affecting one of our 
customers. 

> Don't fetch all map output statuses at each reducer during shuffles
> ---
>
> Key: SPARK-1239
> URL: https://issues.apache.org/jira/browse/SPARK-1239
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Reporter: Patrick Wendell
>Assignee: Andrew Or
> Fix For: 1.1.0
>
>
> Instead we should modify the way we fetch map output statuses to take both a 
> mapper and a reducer - or we should just piggyback the statuses on each task. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-1239) Don't fetch all map output statuses at each reducer during shuffles

2014-09-01 Thread Patrick Wendell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14117914#comment-14117914
 ] 

Patrick Wendell commented on SPARK-1239:


I think [~andrewor] has this in his backlog but it's not actively being worked 
on. [~bcwalrus] do you or [~sandyr] want to take a crack?

> Don't fetch all map output statuses at each reducer during shuffles
> ---
>
> Key: SPARK-1239
> URL: https://issues.apache.org/jira/browse/SPARK-1239
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Reporter: Patrick Wendell
>Assignee: Andrew Or
> Fix For: 1.1.0
>
>
> Instead we should modify the way we fetch map output statuses to take both a 
> mapper and a reducer - or we should just piggyback the statuses on each task. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org