Re: Eliminate copy while sending data : any Akka experts here ?

2014-11-20 Thread Reynold Xin
Can you elaborate? Not 100% sure if I understand what you mean.

On Thu, Nov 20, 2014 at 7:14 PM, Shixiong Zhu zsxw...@gmail.com wrote:

 Is it possible that Spark buffers the messages
 of mapOutputStatuses(Array[Byte]) according to the size
 of mapOutputStatuses which have already sent but not yet ACKed? The buffer
 will be cheap since the mapOutputStatuses messages are same and the memory
 cost is only a few pointers.

 Best Regards,
 Shixiong Zhu

 2014-09-20 16:24 GMT+08:00 Reynold Xin r...@databricks.com:

 BTW - a partial solution here: https://github.com/apache/spark/pull/2470

 This doesn't address the 0 size block problem yet, but makes my large job
 on hundreds of terabytes of data much more reliable.


 On Fri, Jul 4, 2014 at 2:28 AM, Mridul Muralidharan mri...@gmail.com
 wrote:

  In our clusters, number of containers we can get is high but memory
  per container is low : which is why avg_nodes_not_hosting data is
  rarely zero for ML tasks :-)
 
  To update - to unblock our current implementation efforts, we went
  with broadcast - since it is intutively easier and minimal change; and
  compress the array as bytes in TaskResult.
  This is then stored in disk backed maps - to remove memory pressure on
  master and workers (else MapOutputTracker becomes a memory hog).
 
  But I agree, compressed bitmap to represent 'large' blocks (anything
  larger that maxBytesInFlight actually) and probably existing to track
  non zero should be fine (we should not really track zero output for
  reducer - just waste of space).
 
 
  Regards,
  Mridul
 
  On Fri, Jul 4, 2014 at 3:43 AM, Reynold Xin r...@databricks.com
 wrote:
   Note that in my original proposal, I was suggesting we could track
  whether
   block size = 0 using a compressed bitmap. That way we can still avoid
   requests for zero-sized blocks.
  
  
  
   On Thu, Jul 3, 2014 at 3:12 PM, Reynold Xin r...@databricks.com
 wrote:
  
   Yes, that number is likely == 0 in any real workload ...
  
  
   On Thu, Jul 3, 2014 at 8:01 AM, Mridul Muralidharan 
 mri...@gmail.com
   wrote:
  
   On Thu, Jul 3, 2014 at 11:32 AM, Reynold Xin r...@databricks.com
  wrote:
On Wed, Jul 2, 2014 at 3:44 AM, Mridul Muralidharan 
  mri...@gmail.com
wrote:
   
   

 The other thing we do need is the location of blocks. This is
   actually
just
 O(n) because we just need to know where the map was run.
   
For well partitioned data, wont this not involve a lot of
 unwanted
requests to nodes which are not hosting data for a reducer (and
 lack
of ability to throttle).
   
   
Was that a question? (I'm guessing it is). What do you mean
 exactly?
  
  
   I was not sure if I understood the proposal correctly - hence the
   query : if I understood it right - the number of wasted requests
 goes
   up by num_reducers * avg_nodes_not_hosting data.
  
   Ofcourse, if avg_nodes_not_hosting data == 0, then we are fine !
  
   Regards,
   Mridul
  
  
  
 





Re: Eliminate copy while sending data : any Akka experts here ?

2014-07-04 Thread Mridul Muralidharan
In our clusters, number of containers we can get is high but memory
per container is low : which is why avg_nodes_not_hosting data is
rarely zero for ML tasks :-)

To update - to unblock our current implementation efforts, we went
with broadcast - since it is intutively easier and minimal change; and
compress the array as bytes in TaskResult.
This is then stored in disk backed maps - to remove memory pressure on
master and workers (else MapOutputTracker becomes a memory hog).

But I agree, compressed bitmap to represent 'large' blocks (anything
larger that maxBytesInFlight actually) and probably existing to track
non zero should be fine (we should not really track zero output for
reducer - just waste of space).


Regards,
Mridul

On Fri, Jul 4, 2014 at 3:43 AM, Reynold Xin r...@databricks.com wrote:
 Note that in my original proposal, I was suggesting we could track whether
 block size = 0 using a compressed bitmap. That way we can still avoid
 requests for zero-sized blocks.



 On Thu, Jul 3, 2014 at 3:12 PM, Reynold Xin r...@databricks.com wrote:

 Yes, that number is likely == 0 in any real workload ...


 On Thu, Jul 3, 2014 at 8:01 AM, Mridul Muralidharan mri...@gmail.com
 wrote:

 On Thu, Jul 3, 2014 at 11:32 AM, Reynold Xin r...@databricks.com wrote:
  On Wed, Jul 2, 2014 at 3:44 AM, Mridul Muralidharan mri...@gmail.com
  wrote:
 
 
  
   The other thing we do need is the location of blocks. This is
 actually
  just
   O(n) because we just need to know where the map was run.
 
  For well partitioned data, wont this not involve a lot of unwanted
  requests to nodes which are not hosting data for a reducer (and lack
  of ability to throttle).
 
 
  Was that a question? (I'm guessing it is). What do you mean exactly?


 I was not sure if I understood the proposal correctly - hence the
 query : if I understood it right - the number of wasted requests goes
 up by num_reducers * avg_nodes_not_hosting data.

 Ofcourse, if avg_nodes_not_hosting data == 0, then we are fine !

 Regards,
 Mridul





Re: Eliminate copy while sending data : any Akka experts here ?

2014-07-03 Thread Mridul Muralidharan
On Thu, Jul 3, 2014 at 11:32 AM, Reynold Xin r...@databricks.com wrote:
 On Wed, Jul 2, 2014 at 3:44 AM, Mridul Muralidharan mri...@gmail.com
 wrote:


 
  The other thing we do need is the location of blocks. This is actually
 just
  O(n) because we just need to know where the map was run.

 For well partitioned data, wont this not involve a lot of unwanted
 requests to nodes which are not hosting data for a reducer (and lack
 of ability to throttle).


 Was that a question? (I'm guessing it is). What do you mean exactly?


I was not sure if I understood the proposal correctly - hence the
query : if I understood it right - the number of wasted requests goes
up by num_reducers * avg_nodes_not_hosting data.

Ofcourse, if avg_nodes_not_hosting data == 0, then we are fine !

Regards,
Mridul


Re: Eliminate copy while sending data : any Akka experts here ?

2014-07-02 Thread Mridul Muralidharan
Hi Patrick,

  Please see inline.

Regards,
Mridul


On Wed, Jul 2, 2014 at 10:52 AM, Patrick Wendell pwend...@gmail.com wrote:
 b) Instead of pulling this information, push it to executors as part
 of task submission. (What Patrick mentioned ?)
 (1) a.1 from above is still an issue for this.

 I don't understand problem a.1 is. In this case, we don't need to do
 caching, right?


To rephrase in this context, attempting to cache wont help since it is
reducer specific and benefits are minimal (other than for reexecution
for failures and speculative tasks).



 (2) Serialized task size is also a concern : we have already seen
 users hitting akka limits for task size - this will be an additional
 vector which might exacerbate it.

 This would add only a small, constant amount of data to the task. It's
 strictly better than before. Before if the map output status array was
 size M x R, we send a single akka message to every node of size M x
 R... this basically scales quadratically with the size of the RDD. The
 new approach is constant... it's much better. And the total amount of
 data send over the wire is likely much less.


It would be a function of the number of mappers - and an overhead for each task.


Regards,
Mridul


 - Patrick


Re: Eliminate copy while sending data : any Akka experts here ?

2014-07-02 Thread Mridul Muralidharan
Hi Reynold,

  Please see inline.

Regards,
Mridul

On Wed, Jul 2, 2014 at 10:57 AM, Reynold Xin r...@databricks.com wrote:
 I was actually talking to tgraves today at the summit about this.

 Based on my understanding, the sizes we track and send (which is
 unfortunately O(M*R) regardless of how we change the implementation --
 whether we send via task or send via MapOutputTracker) is only used to
 compute maxBytesInFlight so we can throttle the fetching speed to not
 result in oom. Perhaps for very large shuffles, we don't need to send the
 bytes for each block, and we can send whether they are zero or not (which
 can be tracked via a compressed bitmap that can be tiny).

You are right, currently for large blocks, we just need to know where
the block exists.
I was not sure if there was any possible future extension on that -
for this reason, in order to preserve functionality, we moved to using
Short from Byte for MapOutputTracker.compressedSize (to ensure large
sizes can be represented with 0.7% error).

Within a MapStatus, we moved to holding compressed data to save on
space within master/workers (particularly for large number of
reducers).

If we do not anticipate any other reason for size, we can move back
to using Byte instead of Short to compress size (which will reduce
required space by some factor less than 2) : since error in computed
size for blocks larger than maxBytesInFlight does not really matter :
we will split them into different FetchRequest's.



 The other thing we do need is the location of blocks. This is actually just
 O(n) because we just need to know where the map was run.

For well partitioned data, wont this not involve a lot of unwanted
requests to nodes which are not hosting data for a reducer (and lack
of ability to throttle).


Regards,
Mridul



 On Tue, Jul 1, 2014 at 2:51 AM, Mridul Muralidharan mri...@gmail.com
 wrote:

 We had considered both approaches (if I understood the suggestions right) :
 a) Pulling only map output states for tasks which run on the reducer
 by modifying the Actor. (Probably along lines of what Aaron described
 ?)
 The performance implication of this was bad :
 1) We cant cache serialized result anymore, (caching it makes no sense
 rather).
 2) The number requests to master will go from num_executors to
 num_reducers - the latter can be orders of magnitude higher than
 former.

 b) Instead of pulling this information, push it to executors as part
 of task submission. (What Patrick mentioned ?)
 (1) a.1 from above is still an issue for this.
 (2) Serialized task size is also a concern : we have already seen
 users hitting akka limits for task size - this will be an additional
 vector which might exacerbate it.
 Our jobs are not hitting this yet though !

 I was hoping there might be something in akka itself to alleviate this
 - but if not, we can solve it within context of spark.

 Currently, we have worked around it by using broadcast variable when
 serialized size is above some threshold - so that our immediate
 concerns are unblocked :-)
 But a better solution should be greatly welcomed !
 Maybe we can unify it with large serialized task as well ...


 Btw, I am not sure what the higher cost of BlockManager referred to is
 Aaron - do you mean the cost of persisting the serialized map outputs
 to disk ?




 Regards,
 Mridul


 On Tue, Jul 1, 2014 at 1:36 PM, Patrick Wendell pwend...@gmail.com
 wrote:
  Yeah I created a JIRA a while back to piggy-back the map status info
  on top of the task (I honestly think it will be a small change). There
  isn't a good reason to broadcast the entire array and it can be an
  issue during large shuffles.
 
  - Patrick
 
  On Mon, Jun 30, 2014 at 7:58 PM, Aaron Davidson ilike...@gmail.com
 wrote:
  I don't know of any way to avoid Akka doing a copy, but I would like to
  mention that it's on the priority list to piggy-back only the map
 statuses
  relevant to a particular map task on the task itself, thus reducing the
  total amount of data sent over the wire by a factor of N for N physical
  machines in your cluster. Ideally we would also avoid Akka entirely when
  sending the tasks, as these can get somewhat large and Akka doesn't work
  well with large messages.
 
  Do note that your solution of using broadcast to send the map tasks is
 very
  similar to how the executor returns the result of a task when it's too
 big
  for akka. We were thinking of refactoring this too, as using the block
  manager has much higher latency than a direct TCP send.
 
 
  On Mon, Jun 30, 2014 at 12:13 PM, Mridul Muralidharan mri...@gmail.com
 
  wrote:
 
  Our current hack is to use Broadcast variables when serialized
  statuses are above some (configurable) size : and have the workers
  directly pull them from master.
  This is a workaround : so would be great if there was a
  better/principled solution.
 
  Please note that the responses are going to different workers
  requesting for the output statuses for shuffle (after 

Re: Eliminate copy while sending data : any Akka experts here ?

2014-07-01 Thread Patrick Wendell
Yeah I created a JIRA a while back to piggy-back the map status info
on top of the task (I honestly think it will be a small change). There
isn't a good reason to broadcast the entire array and it can be an
issue during large shuffles.

- Patrick

On Mon, Jun 30, 2014 at 7:58 PM, Aaron Davidson ilike...@gmail.com wrote:
 I don't know of any way to avoid Akka doing a copy, but I would like to
 mention that it's on the priority list to piggy-back only the map statuses
 relevant to a particular map task on the task itself, thus reducing the
 total amount of data sent over the wire by a factor of N for N physical
 machines in your cluster. Ideally we would also avoid Akka entirely when
 sending the tasks, as these can get somewhat large and Akka doesn't work
 well with large messages.

 Do note that your solution of using broadcast to send the map tasks is very
 similar to how the executor returns the result of a task when it's too big
 for akka. We were thinking of refactoring this too, as using the block
 manager has much higher latency than a direct TCP send.


 On Mon, Jun 30, 2014 at 12:13 PM, Mridul Muralidharan mri...@gmail.com
 wrote:

 Our current hack is to use Broadcast variables when serialized
 statuses are above some (configurable) size : and have the workers
 directly pull them from master.
 This is a workaround : so would be great if there was a
 better/principled solution.

 Please note that the responses are going to different workers
 requesting for the output statuses for shuffle (after map) - so not
 sure if back pressure buffers, etc would help.


 Regards,
 Mridul


 On Mon, Jun 30, 2014 at 11:07 PM, Mridul Muralidharan mri...@gmail.com
 wrote:
  Hi,
 
While sending map output tracker result, the same serialized byte
  array is sent multiple times - but the akka implementation copies it
  to a private byte array within ByteString for each send.
  Caching a ByteString instead of Array[Byte] did not help, since akka
  does not support special casing ByteString : serializes the
  ByteString, and copies the result out to an array before creating
  ByteString out of it (in Array[Byte] serializing is thankfully simply
  returning same array - so one copy only).
 
 
  Given the need to send immutable data large number of times, is there
  any way to do it in akka without copying internally in akka ?
 
 
  To see how expensive it is, for 200 nodes withi large number of
  mappers and reducers, the status becomes something like 30 mb for us -
  and pulling this about 200 to 300 times results in OOM due to the
  large number of copies sent out.
 
 
  Thanks,
  Mridul



Re: Eliminate copy while sending data : any Akka experts here ?

2014-07-01 Thread Mridul Muralidharan
We had considered both approaches (if I understood the suggestions right) :
a) Pulling only map output states for tasks which run on the reducer
by modifying the Actor. (Probably along lines of what Aaron described
?)
The performance implication of this was bad :
1) We cant cache serialized result anymore, (caching it makes no sense rather).
2) The number requests to master will go from num_executors to
num_reducers - the latter can be orders of magnitude higher than
former.

b) Instead of pulling this information, push it to executors as part
of task submission. (What Patrick mentioned ?)
(1) a.1 from above is still an issue for this.
(2) Serialized task size is also a concern : we have already seen
users hitting akka limits for task size - this will be an additional
vector which might exacerbate it.
Our jobs are not hitting this yet though !

I was hoping there might be something in akka itself to alleviate this
- but if not, we can solve it within context of spark.

Currently, we have worked around it by using broadcast variable when
serialized size is above some threshold - so that our immediate
concerns are unblocked :-)
But a better solution should be greatly welcomed !
Maybe we can unify it with large serialized task as well ...


Btw, I am not sure what the higher cost of BlockManager referred to is
Aaron - do you mean the cost of persisting the serialized map outputs
to disk ?




Regards,
Mridul


On Tue, Jul 1, 2014 at 1:36 PM, Patrick Wendell pwend...@gmail.com wrote:
 Yeah I created a JIRA a while back to piggy-back the map status info
 on top of the task (I honestly think it will be a small change). There
 isn't a good reason to broadcast the entire array and it can be an
 issue during large shuffles.

 - Patrick

 On Mon, Jun 30, 2014 at 7:58 PM, Aaron Davidson ilike...@gmail.com wrote:
 I don't know of any way to avoid Akka doing a copy, but I would like to
 mention that it's on the priority list to piggy-back only the map statuses
 relevant to a particular map task on the task itself, thus reducing the
 total amount of data sent over the wire by a factor of N for N physical
 machines in your cluster. Ideally we would also avoid Akka entirely when
 sending the tasks, as these can get somewhat large and Akka doesn't work
 well with large messages.

 Do note that your solution of using broadcast to send the map tasks is very
 similar to how the executor returns the result of a task when it's too big
 for akka. We were thinking of refactoring this too, as using the block
 manager has much higher latency than a direct TCP send.


 On Mon, Jun 30, 2014 at 12:13 PM, Mridul Muralidharan mri...@gmail.com
 wrote:

 Our current hack is to use Broadcast variables when serialized
 statuses are above some (configurable) size : and have the workers
 directly pull them from master.
 This is a workaround : so would be great if there was a
 better/principled solution.

 Please note that the responses are going to different workers
 requesting for the output statuses for shuffle (after map) - so not
 sure if back pressure buffers, etc would help.


 Regards,
 Mridul


 On Mon, Jun 30, 2014 at 11:07 PM, Mridul Muralidharan mri...@gmail.com
 wrote:
  Hi,
 
While sending map output tracker result, the same serialized byte
  array is sent multiple times - but the akka implementation copies it
  to a private byte array within ByteString for each send.
  Caching a ByteString instead of Array[Byte] did not help, since akka
  does not support special casing ByteString : serializes the
  ByteString, and copies the result out to an array before creating
  ByteString out of it (in Array[Byte] serializing is thankfully simply
  returning same array - so one copy only).
 
 
  Given the need to send immutable data large number of times, is there
  any way to do it in akka without copying internally in akka ?
 
 
  To see how expensive it is, for 200 nodes withi large number of
  mappers and reducers, the status becomes something like 30 mb for us -
  and pulling this about 200 to 300 times results in OOM due to the
  large number of copies sent out.
 
 
  Thanks,
  Mridul



Re: Eliminate copy while sending data : any Akka experts here ?

2014-07-01 Thread Patrick Wendell
 b) Instead of pulling this information, push it to executors as part
 of task submission. (What Patrick mentioned ?)
 (1) a.1 from above is still an issue for this.

I don't understand problem a.1 is. In this case, we don't need to do
caching, right?

 (2) Serialized task size is also a concern : we have already seen
 users hitting akka limits for task size - this will be an additional
 vector which might exacerbate it.

This would add only a small, constant amount of data to the task. It's
strictly better than before. Before if the map output status array was
size M x R, we send a single akka message to every node of size M x
R... this basically scales quadratically with the size of the RDD. The
new approach is constant... it's much better. And the total amount of
data send over the wire is likely much less.

- Patrick


Re: Eliminate copy while sending data : any Akka experts here ?

2014-07-01 Thread Reynold Xin
I was actually talking to tgraves today at the summit about this.

Based on my understanding, the sizes we track and send (which is
unfortunately O(M*R) regardless of how we change the implementation --
whether we send via task or send via MapOutputTracker) is only used to
compute maxBytesInFlight so we can throttle the fetching speed to not
result in oom. Perhaps for very large shuffles, we don't need to send the
bytes for each block, and we can send whether they are zero or not (which
can be tracked via a compressed bitmap that can be tiny).

The other thing we do need is the location of blocks. This is actually just
O(n) because we just need to know where the map was run.


On Tue, Jul 1, 2014 at 2:51 AM, Mridul Muralidharan mri...@gmail.com
wrote:

 We had considered both approaches (if I understood the suggestions right) :
 a) Pulling only map output states for tasks which run on the reducer
 by modifying the Actor. (Probably along lines of what Aaron described
 ?)
 The performance implication of this was bad :
 1) We cant cache serialized result anymore, (caching it makes no sense
 rather).
 2) The number requests to master will go from num_executors to
 num_reducers - the latter can be orders of magnitude higher than
 former.

 b) Instead of pulling this information, push it to executors as part
 of task submission. (What Patrick mentioned ?)
 (1) a.1 from above is still an issue for this.
 (2) Serialized task size is also a concern : we have already seen
 users hitting akka limits for task size - this will be an additional
 vector which might exacerbate it.
 Our jobs are not hitting this yet though !

 I was hoping there might be something in akka itself to alleviate this
 - but if not, we can solve it within context of spark.

 Currently, we have worked around it by using broadcast variable when
 serialized size is above some threshold - so that our immediate
 concerns are unblocked :-)
 But a better solution should be greatly welcomed !
 Maybe we can unify it with large serialized task as well ...


 Btw, I am not sure what the higher cost of BlockManager referred to is
 Aaron - do you mean the cost of persisting the serialized map outputs
 to disk ?




 Regards,
 Mridul


 On Tue, Jul 1, 2014 at 1:36 PM, Patrick Wendell pwend...@gmail.com
 wrote:
  Yeah I created a JIRA a while back to piggy-back the map status info
  on top of the task (I honestly think it will be a small change). There
  isn't a good reason to broadcast the entire array and it can be an
  issue during large shuffles.
 
  - Patrick
 
  On Mon, Jun 30, 2014 at 7:58 PM, Aaron Davidson ilike...@gmail.com
 wrote:
  I don't know of any way to avoid Akka doing a copy, but I would like to
  mention that it's on the priority list to piggy-back only the map
 statuses
  relevant to a particular map task on the task itself, thus reducing the
  total amount of data sent over the wire by a factor of N for N physical
  machines in your cluster. Ideally we would also avoid Akka entirely when
  sending the tasks, as these can get somewhat large and Akka doesn't work
  well with large messages.
 
  Do note that your solution of using broadcast to send the map tasks is
 very
  similar to how the executor returns the result of a task when it's too
 big
  for akka. We were thinking of refactoring this too, as using the block
  manager has much higher latency than a direct TCP send.
 
 
  On Mon, Jun 30, 2014 at 12:13 PM, Mridul Muralidharan mri...@gmail.com
 
  wrote:
 
  Our current hack is to use Broadcast variables when serialized
  statuses are above some (configurable) size : and have the workers
  directly pull them from master.
  This is a workaround : so would be great if there was a
  better/principled solution.
 
  Please note that the responses are going to different workers
  requesting for the output statuses for shuffle (after map) - so not
  sure if back pressure buffers, etc would help.
 
 
  Regards,
  Mridul
 
 
  On Mon, Jun 30, 2014 at 11:07 PM, Mridul Muralidharan 
 mri...@gmail.com
  wrote:
   Hi,
  
 While sending map output tracker result, the same serialized byte
   array is sent multiple times - but the akka implementation copies it
   to a private byte array within ByteString for each send.
   Caching a ByteString instead of Array[Byte] did not help, since akka
   does not support special casing ByteString : serializes the
   ByteString, and copies the result out to an array before creating
   ByteString out of it (in Array[Byte] serializing is thankfully simply
   returning same array - so one copy only).
  
  
   Given the need to send immutable data large number of times, is there
   any way to do it in akka without copying internally in akka ?
  
  
   To see how expensive it is, for 200 nodes withi large number of
   mappers and reducers, the status becomes something like 30 mb for us
 -
   and pulling this about 200 to 300 times results in OOM due to the
   large number of copies sent out.
  
  
   Thanks,
   

Re: Eliminate copy while sending data : any Akka experts here ?

2014-06-30 Thread Aaron Davidson
I don't know of any way to avoid Akka doing a copy, but I would like to
mention that it's on the priority list to piggy-back only the map statuses
relevant to a particular map task on the task itself, thus reducing the
total amount of data sent over the wire by a factor of N for N physical
machines in your cluster. Ideally we would also avoid Akka entirely when
sending the tasks, as these can get somewhat large and Akka doesn't work
well with large messages.

Do note that your solution of using broadcast to send the map tasks is very
similar to how the executor returns the result of a task when it's too big
for akka. We were thinking of refactoring this too, as using the block
manager has much higher latency than a direct TCP send.


On Mon, Jun 30, 2014 at 12:13 PM, Mridul Muralidharan mri...@gmail.com
wrote:

 Our current hack is to use Broadcast variables when serialized
 statuses are above some (configurable) size : and have the workers
 directly pull them from master.
 This is a workaround : so would be great if there was a
 better/principled solution.

 Please note that the responses are going to different workers
 requesting for the output statuses for shuffle (after map) - so not
 sure if back pressure buffers, etc would help.


 Regards,
 Mridul


 On Mon, Jun 30, 2014 at 11:07 PM, Mridul Muralidharan mri...@gmail.com
 wrote:
  Hi,
 
While sending map output tracker result, the same serialized byte
  array is sent multiple times - but the akka implementation copies it
  to a private byte array within ByteString for each send.
  Caching a ByteString instead of Array[Byte] did not help, since akka
  does not support special casing ByteString : serializes the
  ByteString, and copies the result out to an array before creating
  ByteString out of it (in Array[Byte] serializing is thankfully simply
  returning same array - so one copy only).
 
 
  Given the need to send immutable data large number of times, is there
  any way to do it in akka without copying internally in akka ?
 
 
  To see how expensive it is, for 200 nodes withi large number of
  mappers and reducers, the status becomes something like 30 mb for us -
  and pulling this about 200 to 300 times results in OOM due to the
  large number of copies sent out.
 
 
  Thanks,
  Mridul