[ 
https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15204900#comment-15204900
 ] 

Thomas Graves commented on SPARK-1239:
--------------------------------------

So I have been looking at this and testing a few changes out.

There are a few issues here but if we are looking at solving the driver memory 
bloat issue then I think this comes down to flow control issue.  The Driver is 
trying to respond to all the map status requests and is shoving them out to 
Netty quicker then netty can send them and we end up using a lot of memory very 
quickly.  Yes you can try to reduce the size of the MapStatuses but you can 
only do that to a point and you could still have the this issue.

There are multiple possible ways to solve this.  The approach I have been 
looking at is having the MapOutputTracker have its own queue and thread pool 
for handling requests.  This gives us the flexibility to do multiple things:

- We can make the reply synchronous (ie it waits for response from netty to 
start next reply) without blocking the normal dispatcher threads which do 
things like handling heartbeats, thus giving us flow control. We can decide to 
do this only if the map output status are above a certain size or do it all the 
time.  You can adjust the thread pool size to handle more in parallel.  you 
could make this more sophisticated in the future if we want to have some sort 
of send queue rather then blocking each thread.  
- We can easily synchronize incoming requests without blocking dispatcher 
threads so we don't serialize the same MapStatus multiple times.  Background - 
one other problem I've been seeing is that you get a bunch of requests for map 
status in at once, we have a lot of dispatchers threads running in parallel, 
all of those do the check to see if the map status is cached, all of them 
report its not, and you have multiple threads all serializing the exact same 
map output statuses.
- doesn't limit us with sending map status with Task data.  ie if we want to 
change Spark in the future to start Reducer tasks before all map tasks finish 
(MapReduce does this now) this more easily works with that.  
 
I still need to do some more testing on this but I wanted to see what people 
thought of this approach?

What I have implemented right now is a queue and threadpool in the 
MapOutputTracker to handle the requests.  if its over 5MB (still deciding on 
this size) then when it replies it waits for it to actually send before 
grabbing the next request.
For the second bullet above I did a somewhat simpler approach for now and when 
registerMapOutputs is called I have it cache the map status output then instead 
of waiting for a request to come in. This helps as it will make sure the last 
one is cached but if you have multiple then the others still won't be in the 
cache.  We could either have it cache more or take an approach like I mention 
above to have it just synchronize and cache one upon the first request.

One of the large jobs I'm using to test this is shuffling 15TB of data using 
202000 map tasks going down to 500 reducers.  The driver originally was using 
20GB of memory, with my changes I was able to successfully run it with 5GB.

> Don't fetch all map output statuses at each reducer during shuffles
> -------------------------------------------------------------------
>
>                 Key: SPARK-1239
>                 URL: https://issues.apache.org/jira/browse/SPARK-1239
>             Project: Spark
>          Issue Type: Improvement
>          Components: Shuffle, Spark Core
>    Affects Versions: 1.0.2, 1.1.0
>            Reporter: Patrick Wendell
>            Assignee: Thomas Graves
>
> Instead we should modify the way we fetch map output statuses to take both a 
> mapper and a reducer - or we should just piggyback the statuses on each task. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to