[jira] [Commented] (SPARK-1239) Don't fetch all map output statuses at each reducer during shuffles
[ https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15205279#comment-15205279 ] Thomas Graves commented on SPARK-1239: -- I do like the idea of broadcast and originally when I had tried it I had the issue mentioned in the second bullet point, but as long as we are synchronizing on the requests so we only broadcast it once we should be ok. It does seem to have some further constraints though too. With a sufficient large job I don't think it matters but what if we only have a small number of reducers, we broadcast it to all executors when only a couple need it. I guess that doesn't hurt much unless the other executors start going to the executors your reducers are on and add more load to them. Should be pretty minimal though. Broadcast also seems to make less sense when using the dynamic allocation. At least I've seen issues when executors go away, it fails fetch from that one, has to retry, etc, adding additional time. We recently specifically fixed one issue with this to make it go get locations again after certain number of failures. That time should be less now that we fixed that but I'll have to run the numbers. I'll do some more analysis/testing of this and see if that really matters. with a sufficient number of threads I don't think a few slow nodes would make much of a difference here, if you have that many slow nodes the shuffle itself is going to be impacted which I would see as a larger affect. The slow nodes could just as well affect the broadcast as well. Hopefully you skip those as it takes longer for those to get a chunk, buts its possible that once that slow one has a chunk or two, more and more executors start going to that one for the broadcast data instead of the driver thus slowing down more transfers. But its a good point and my current method would truly block (for a certain time) rather then being slow. Note that there is a timeout on waiting for the send to happen and when it does it closes the connection and executor would retry. You don't have to worry about that with broadcast. I'll do some more analysis with that approach. I wish Netty had some other built in mechanisms for flow control. > Don't fetch all map output statuses at each reducer during shuffles > --- > > Key: SPARK-1239 > URL: https://issues.apache.org/jira/browse/SPARK-1239 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 1.0.2, 1.1.0 >Reporter: Patrick Wendell >Assignee: Thomas Graves > > Instead we should modify the way we fetch map output statuses to take both a > mapper and a reducer - or we should just piggyback the statuses on each task. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-1239) Don't fetch all map output statuses at each reducer during shuffles
[ https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15204939#comment-15204939 ] Mridul Muralidharan commented on SPARK-1239: [~tgraves] For the last part (waiting bit) - why not make the threshold where you use Broadcast instead of direct serialization such that the problem 'goes away' ? For my case, I was using a fairly high number, but nothing stopping us from using say 1mb - which means number of outstanding requests which will cause memory issue becomes extremely high to the point of being not possible practically. In general, I dont like the point about waiting for IO to complete - different nodes might have different loads, which can cause driver not to respond to fast nodes because slow nodes cause the response not to be sent (over time). > Don't fetch all map output statuses at each reducer during shuffles > --- > > Key: SPARK-1239 > URL: https://issues.apache.org/jira/browse/SPARK-1239 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 1.0.2, 1.1.0 >Reporter: Patrick Wendell >Assignee: Thomas Graves > > Instead we should modify the way we fetch map output statuses to take both a > mapper and a reducer - or we should just piggyback the statuses on each task. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-1239) Don't fetch all map output statuses at each reducer during shuffles
[ https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15204900#comment-15204900 ] Thomas Graves commented on SPARK-1239: -- So I have been looking at this and testing a few changes out. There are a few issues here but if we are looking at solving the driver memory bloat issue then I think this comes down to flow control issue. The Driver is trying to respond to all the map status requests and is shoving them out to Netty quicker then netty can send them and we end up using a lot of memory very quickly. Yes you can try to reduce the size of the MapStatuses but you can only do that to a point and you could still have the this issue. There are multiple possible ways to solve this. The approach I have been looking at is having the MapOutputTracker have its own queue and thread pool for handling requests. This gives us the flexibility to do multiple things: - We can make the reply synchronous (ie it waits for response from netty to start next reply) without blocking the normal dispatcher threads which do things like handling heartbeats, thus giving us flow control. We can decide to do this only if the map output status are above a certain size or do it all the time. You can adjust the thread pool size to handle more in parallel. you could make this more sophisticated in the future if we want to have some sort of send queue rather then blocking each thread. - We can easily synchronize incoming requests without blocking dispatcher threads so we don't serialize the same MapStatus multiple times. Background - one other problem I've been seeing is that you get a bunch of requests for map status in at once, we have a lot of dispatchers threads running in parallel, all of those do the check to see if the map status is cached, all of them report its not, and you have multiple threads all serializing the exact same map output statuses. - doesn't limit us with sending map status with Task data. ie if we want to change Spark in the future to start Reducer tasks before all map tasks finish (MapReduce does this now) this more easily works with that. I still need to do some more testing on this but I wanted to see what people thought of this approach? What I have implemented right now is a queue and threadpool in the MapOutputTracker to handle the requests. if its over 5MB (still deciding on this size) then when it replies it waits for it to actually send before grabbing the next request. For the second bullet above I did a somewhat simpler approach for now and when registerMapOutputs is called I have it cache the map status output then instead of waiting for a request to come in. This helps as it will make sure the last one is cached but if you have multiple then the others still won't be in the cache. We could either have it cache more or take an approach like I mention above to have it just synchronize and cache one upon the first request. One of the large jobs I'm using to test this is shuffling 15TB of data using 202000 map tasks going down to 500 reducers. The driver originally was using 20GB of memory, with my changes I was able to successfully run it with 5GB. > Don't fetch all map output statuses at each reducer during shuffles > --- > > Key: SPARK-1239 > URL: https://issues.apache.org/jira/browse/SPARK-1239 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 1.0.2, 1.1.0 >Reporter: Patrick Wendell >Assignee: Thomas Graves > > Instead we should modify the way we fetch map output statuses to take both a > mapper and a reducer - or we should just piggyback the statuses on each task. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-1239) Don't fetch all map output statuses at each reducer during shuffles
[ https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15130351#comment-15130351 ] Daniel Darabos commented on SPARK-1239: --- I've read an interesting article about the "Kylix" butterfly allreduce (http://www.cs.berkeley.edu/~jfc/papers/14/Kylix.pdf). I think this is a direct solution to this problem and the authors say integration with Spark should be "easy". Perhaps the same approach could be simulated within the current Spark shuffle implementation. I think the idea is to break up the M*R shuffle into an M*K and a K*R shuffle, where K is much less then M or R. So those K partitions will be large, but that should be fine. > Don't fetch all map output statuses at each reducer during shuffles > --- > > Key: SPARK-1239 > URL: https://issues.apache.org/jira/browse/SPARK-1239 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 1.0.2, 1.1.0 >Reporter: Patrick Wendell >Assignee: Thomas Graves > > Instead we should modify the way we fetch map output statuses to take both a > mapper and a reducer - or we should just piggyback the statuses on each task. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-1239) Don't fetch all map output statuses at each reducer during shuffles
[ https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15036341#comment-15036341 ] Thomas Graves commented on SPARK-1239: -- I have another user hitting this also. The above mentions other issues that need to be addressed in MapOutputStatusTracker do you have links to those other issues? > Don't fetch all map output statuses at each reducer during shuffles > --- > > Key: SPARK-1239 > URL: https://issues.apache.org/jira/browse/SPARK-1239 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 1.0.2, 1.1.0 >Reporter: Patrick Wendell > > Instead we should modify the way we fetch map output statuses to take both a > mapper and a reducer - or we should just piggyback the statuses on each task. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-1239) Don't fetch all map output statuses at each reducer during shuffles
[ https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14989638#comment-14989638 ] Alex Slusarenko commented on SPARK-1239: > How is this not a major issue for everyone? Daniel, I had exactly the same thoughts, but I found out that most of Spark users have small data. Most users need about 10 servers. I was even more surprised when I had to fix SPARK-6246 myself. Spark was not able to launch more than 100 instances on Amazon. > Don't fetch all map output statuses at each reducer during shuffles > --- > > Key: SPARK-1239 > URL: https://issues.apache.org/jira/browse/SPARK-1239 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 1.0.2, 1.1.0 >Reporter: Patrick Wendell > > Instead we should modify the way we fetch map output statuses to take both a > mapper and a reducer - or we should just piggyback the statuses on each task. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-1239) Don't fetch all map output statuses at each reducer during shuffles
[ https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14987046#comment-14987046 ] Daniel Darabos commented on SPARK-1239: --- I can also add some data. I have a ShuffleMapStage with 82,714 tasks and then a ResultStage with 222,609 tasks. The driver cannot serialize this: {noformat} java.lang.OutOfMemoryError: Requested array size exceeds VM limit at java.util.Arrays.copyOf(Arrays.java:2271) ~[na:1.7.0_79] at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) ~[na:1.7.0_79] at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) ~[na:1.7.0_79] at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) ~[na:1.7.0_79] at java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253) ~[na:1.7.0_79] at java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211) ~[na:1.7.0_79] at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:146) ~[na:1.7.0_79] at java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1893) ~[na:1.7.0_79] at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1874) ~[na:1.7.0_79] at java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1821) ~[na:1.7.0_79] at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:718) ~[na:1.7.0_79] at java.io.ObjectOutputStream.close(ObjectOutputStream.java:739) ~[na:1.7.0_79] at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$2.apply$mcV$sp(MapOutputTracker.scala:362) ~[spark-assembly-1.4.0-hadoop2.4.0.jar:1.4.0] at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1294) ~[spark-assembly-1.4.0-hadoop2.4.0.jar:1.4.0] at org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:361) ~[spark-assembly-1.4.0-hadoop2.4.0.jar:1.4.0] at org.apache.spark.MapOutputTrackerMaster.getSerializedMapOutputStatuses(MapOutputTracker.scala:312) ~[spark-assembly-1.4.0-hadoop2.4.0.jar:1.4.0] at org.apache.spark.MapOutputTrackerMasterEndpoint$$anonfun$receiveAndReply$1.applyOrElse(MapOutputTracker.scala:49) ~[spark-assembly-1.4.0-hadoop2.4.0.jar:1.4.0] {noformat} I see {{getSerializedMapOutputStatuses}} has changed a lot since 1.4.0 but it still returns an array sized proportional to _M * R_. How can this be part of a scalable system? How is this not a major issue for everyone? Am I doing something wrong? I'm now thinking that maybe if you have an overwhelming majority of empty or non-empty blocks, the bitmap will compress very well. But it's possible that I am ending up with a relatively even mix of empty and non-empty blocks, killing the compression. I have about 40 billion lines, _M * R_ is about 20 billion, so this seems plausible. It's also possible that I should have larger partitions. Due to the processing I do it's not possible -- it leads to the executors OOMing. But larger partitions would not be a scalable solution anyway. If _M_ and _R_ are reasonable now with some number of lines per partition, then when your data size doubles they will also double and _M * R_ will quadruple. At some point the number of lines per map output will be low enough that compression becomes ineffective. I see https://issues.apache.org/jira/browse/SPARK-11271 has recently decreased the map status size by 20%. That means in Spark 1.6 I will be able to process 1/sqrt(0.8) or 12% more data than now. The way I understand the situation the improvement required is orders of magnitude larger than that. I'm currently hitting this issue with 5 TB of input. If I tried processing 5 PB, the map status would be a million times larger. I like the premise of this JIRA ticket of not building the map status table in the first place. But a colleague of mine asks if perhaps we could even avoid tracking this data in the driver. If the driver just provided the reducers with the list of mappers they could each just ask the mappers directly for the list of blocks they should fetch. > Don't fetch all map output statuses at each reducer during shuffles > --- > > Key: SPARK-1239 > URL: https://issues.apache.org/jira/browse/SPARK-1239 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 1.0.2, 1.1.0 >Reporter: Patrick Wendell > > Instead we should modify the way we fetch map output statuses to take both a > mapper and a reducer - or we should just piggyback the statuses on each task. -- This message was sent by Atlassian JIRA (v6.3.4#6332) ---
[jira] [Commented] (SPARK-1239) Don't fetch all map output statuses at each reducer during shuffles
[ https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14645913#comment-14645913 ] Alex Slusarenko commented on SPARK-1239: Hi, all. We have faced this issue many times. Currently, we have 250 000 map tasks and the same amount of reduce tasks. We have 200 slave nodes. The driver has 80 G RAM. First we observed akka frame size limit exception and after increasing the limit we see OOM. Here is the corresponding part of the log: {noformat} ... 15/07/27 17:22:56 INFO TaskSchedulerImpl: Adding task set 1.0 with 262144 tasks 15/07/27 17:22:57 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 233766, 10.47.190.240, PROCESS_LOCAL, 1215 bytes) 15/07/27 17:22:57 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 233767, 10.145.26.133, PROCESS_LOCAL, 1215 bytes) 15/07/27 17:22:57 INFO TaskSetManager: Starting task 2.0 in stage 1.0 (TID 233768, 10.51.191.206, PROCESS_LOCAL, 1215 bytes) ... 15/07/27 17:22:57 INFO TaskSetManager: Starting task 3197.0 in stage 1.0 (TID 236963, 10.99.197.178, PROCESS_LOCAL, 1215 bytes) 15/07/27 17:22:57 INFO TaskSetManager: Starting task 3198.0 in stage 1.0 (TID 236964, 10.65.148.16, PROCESS_LOCAL, 1215 bytes) 15/07/27 17:22:57 INFO TaskSetManager: Starting task 3199.0 in stage 1.0 (TID 236965, 10.123.204.224, PROCESS_LOCAL, 1215 bytes) 15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.145.30.250:38441 (size: 3.8 KB, free: 4.1 GB) 15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.140.170.222:35810 (size: 3.8 KB, free: 4.1 GB) 15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.7.205.149:43761 (size: 3.8 KB, free: 4.1 GB) ... 5/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.165.146.7:37388 (size: 3.8 KB, free: 4.1 GB) 15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.153.254.79:49517 (size: 3.8 KB, free: 4.1 GB) 15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.95.198.154:53675 (size: 3.8 KB, free: 4.1 GB) 15/07/27 17:24:41 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 166509346 bytes 15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to 10.109.157.235:39740 15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to 10.166.156.78:59382 15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to 10.152.41.131:47968 ... 15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to 10.140.253.251:44621 15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to 10.153.254.79:42648 15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to 10.169.230.246:45473 15/07/27 17:25:31 INFO BlockManagerInfo: Removed broadcast_1_piece0 on 10.146.43.5:49989 in memory (size: 3.2 KB, free: 3.4 GB) 15/07/27 17:27:25 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.remote.default-remote-dispatcher-47] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:3236) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877) at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129) at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129) at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129) at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36) at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:845) at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:845) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:844) at akka.remote.EndpointWriter.writeSend(End
[jira] [Commented] (SPARK-1239) Don't fetch all map output statuses at each reducer during shuffles
[ https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14352377#comment-14352377 ] Mridul Muralidharan commented on SPARK-1239: Hitting akka framesize for map outputtracker is very easy since we fetch whole output (m * r) - while I cant get into specifics of our jobs or share logs; but it is easy to see this hitting 1G for 100k mappers and 50k reducers. If this is not being looked into currently, I can add it to my list of things to fix - but if there is already work being done, I dont want to duplicate it. Even something trivial like what was done in task result would suffice (if we dont want the additional overhead of per per reduce map output generation at master). > Don't fetch all map output statuses at each reducer during shuffles > --- > > Key: SPARK-1239 > URL: https://issues.apache.org/jira/browse/SPARK-1239 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 1.0.2, 1.1.0 >Reporter: Patrick Wendell > > Instead we should modify the way we fetch map output statuses to take both a > mapper and a reducer - or we should just piggyback the statuses on each task. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-1239) Don't fetch all map output statuses at each reducer during shuffles
[ https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14352356#comment-14352356 ] Patrick Wendell commented on SPARK-1239: It would be helpful if any users who have observed this could comment on the JIRA and give workload information. This has been more on the back burner since we've heard few reports of it on the mailing list, etc... > Don't fetch all map output statuses at each reducer during shuffles > --- > > Key: SPARK-1239 > URL: https://issues.apache.org/jira/browse/SPARK-1239 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 1.0.2, 1.1.0 >Reporter: Patrick Wendell > > Instead we should modify the way we fetch map output statuses to take both a > mapper and a reducer - or we should just piggyback the statuses on each task. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-1239) Don't fetch all map output statuses at each reducer during shuffles
[ https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14352306#comment-14352306 ] Kostas Sakellis commented on SPARK-1239: How many reduce side tasks do you have? Can you please attach your your logs that show the OOM errors/ > Don't fetch all map output statuses at each reducer during shuffles > --- > > Key: SPARK-1239 > URL: https://issues.apache.org/jira/browse/SPARK-1239 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 1.0.2, 1.1.0 >Reporter: Patrick Wendell > > Instead we should modify the way we fetch map output statuses to take both a > mapper and a reducer - or we should just piggyback the statuses on each task. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-1239) Don't fetch all map output statuses at each reducer during shuffles
[ https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14352284#comment-14352284 ] Mridul Muralidharan commented on SPARK-1239: [~pwendell] Is there any update on this ? This is fairly commonly hitting us, and we are at 1Gig for framesize already now ... > Don't fetch all map output statuses at each reducer during shuffles > --- > > Key: SPARK-1239 > URL: https://issues.apache.org/jira/browse/SPARK-1239 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 1.0.2, 1.1.0 >Reporter: Patrick Wendell > > Instead we should modify the way we fetch map output statuses to take both a > mapper and a reducer - or we should just piggyback the statuses on each task. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-1239) Don't fetch all map output statuses at each reducer during shuffles
[ https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14182122#comment-14182122 ] Patrick Wendell commented on SPARK-1239: Hey Kostas - there are a few other bugs that required a refactoring to solve them that will subsume this issue, so I think that's why [~joshrosen] is grabbing it. This change will involve fairly significant surgery to a very complex part of Spark, so it might not be the best beginner issue. If Josh's other changes don't end up fixing this, we can leave this open as an independent issue and see if it can be done surgically. > Don't fetch all map output statuses at each reducer during shuffles > --- > > Key: SPARK-1239 > URL: https://issues.apache.org/jira/browse/SPARK-1239 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 1.0.2, 1.1.0 >Reporter: Patrick Wendell >Assignee: Josh Rosen > > Instead we should modify the way we fetch map output statuses to take both a > mapper and a reducer - or we should just piggyback the statuses on each task. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-1239) Don't fetch all map output statuses at each reducer during shuffles
[ https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14182110#comment-14182110 ] Kostas Sakellis commented on SPARK-1239: Apologies for not commenting on this JIRA sooner but since I'm new to Spark, it has taken a bit of time to wrap my head around how the different schedulers (DAG and Task) interoperate. I'm currently investigating completely removing the MapOutputTracker class. Instead, the DAGScheduler can push down the required map status' to the next stage. I'm thinking of modifying the DAGScheduler.submitMissingTasks(..) to take in (or query) the mapStatus' of the previous completed stage. When a new ShuffleMapTask gets created, we pass the filtered mapStatus' necessary for that task. The map status data can be stored in the TaskContext and used in the BlockStoreShuffleFetcher when block data is being read (currently uses the MapOutputTracker). What I'm investigating currently is how to filter the data when I create the ShuffleMapTask - the BlockStoreShuffleFetcher uses a shuffleId which I don't seem to have access to in the DAGScheduler. I haven't yet written any code to test this out so if anyone has any concerns please let me know. Also, [~joshrosen] pointed out that the size of the map output structure even after it has been filtered could still be very large. This is worth investigating. > Don't fetch all map output statuses at each reducer during shuffles > --- > > Key: SPARK-1239 > URL: https://issues.apache.org/jira/browse/SPARK-1239 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 1.0.2, 1.1.0 >Reporter: Patrick Wendell >Assignee: Josh Rosen > > Instead we should modify the way we fetch map output statuses to take both a > mapper and a reducer - or we should just piggyback the statuses on each task. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-1239) Don't fetch all map output statuses at each reducer during shuffles
[ https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14163423#comment-14163423 ] DB Tsai commented on SPARK-1239: +1, we run into this issue as well. > Don't fetch all map output statuses at each reducer during shuffles > --- > > Key: SPARK-1239 > URL: https://issues.apache.org/jira/browse/SPARK-1239 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 1.0.2, 1.1.0 >Reporter: Patrick Wendell >Assignee: Kostas Sakellis > > Instead we should modify the way we fetch map output statuses to take both a > mapper and a reducer - or we should just piggyback the statuses on each task. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-1239) Don't fetch all map output statuses at each reducer during shuffles
[ https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14134198#comment-14134198 ] Patrick Wendell commented on SPARK-1239: Yes, the current state of the art is to just increase the frame size. > Don't fetch all map output statuses at each reducer during shuffles > --- > > Key: SPARK-1239 > URL: https://issues.apache.org/jira/browse/SPARK-1239 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 1.0.2, 1.1.0 >Reporter: Patrick Wendell >Assignee: Kostas Sakellis > > Instead we should modify the way we fetch map output statuses to take both a > mapper and a reducer - or we should just piggyback the statuses on each task. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-1239) Don't fetch all map output statuses at each reducer during shuffles
[ https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14134160#comment-14134160 ] Andrew Ash commented on SPARK-1239: --- For large statuses, would we expect that to exceed {{spark.akka.frameSize}} and cause the below exception? {noformat} 2014-09-14T01:34:21.305 ERROR [spark-akka.actor.default-dispatcher-4] org.apache.spark.MapOutputTrackerMasterActor - Map output statuses were 13920119 bytes which exceeds spark.akka.frameSize (10485760 bytes). {noformat} > Don't fetch all map output statuses at each reducer during shuffles > --- > > Key: SPARK-1239 > URL: https://issues.apache.org/jira/browse/SPARK-1239 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 1.0.2, 1.1.0 >Reporter: Patrick Wendell >Assignee: Kostas Sakellis > > Instead we should modify the way we fetch map output statuses to take both a > mapper and a reducer - or we should just piggyback the statuses on each task. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-1239) Don't fetch all map output statuses at each reducer during shuffles
[ https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14126215#comment-14126215 ] Andrew Or commented on SPARK-1239: -- I have reassigned it to you Kostas. > Don't fetch all map output statuses at each reducer during shuffles > --- > > Key: SPARK-1239 > URL: https://issues.apache.org/jira/browse/SPARK-1239 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Reporter: Patrick Wendell >Assignee: Kostas Sakellis > Fix For: 1.1.0 > > > Instead we should modify the way we fetch map output statuses to take both a > mapper and a reducer - or we should just piggyback the statuses on each task. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-1239) Don't fetch all map output statuses at each reducer during shuffles
[ https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14123883#comment-14123883 ] Kostas Sakellis commented on SPARK-1239: [~pwendell] I'd like to take a crack at this since it is affecting one of our customers. > Don't fetch all map output statuses at each reducer during shuffles > --- > > Key: SPARK-1239 > URL: https://issues.apache.org/jira/browse/SPARK-1239 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Reporter: Patrick Wendell >Assignee: Andrew Or > Fix For: 1.1.0 > > > Instead we should modify the way we fetch map output statuses to take both a > mapper and a reducer - or we should just piggyback the statuses on each task. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-1239) Don't fetch all map output statuses at each reducer during shuffles
[ https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14117914#comment-14117914 ] Patrick Wendell commented on SPARK-1239: I think [~andrewor] has this in his backlog but it's not actively being worked on. [~bcwalrus] do you or [~sandyr] want to take a crack? > Don't fetch all map output statuses at each reducer during shuffles > --- > > Key: SPARK-1239 > URL: https://issues.apache.org/jira/browse/SPARK-1239 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Reporter: Patrick Wendell >Assignee: Andrew Or > Fix For: 1.1.0 > > > Instead we should modify the way we fetch map output statuses to take both a > mapper and a reducer - or we should just piggyback the statuses on each task. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org