Re: Eliminate copy while sending data : any Akka experts here ?
MapOutputTrackerMasterActor sends the `mapOutputStatuses` to a buffer at first. The messages in this buffer will be sent by some background threads. In these threads, they will check if there are already too many messages sent to Akka. If so, they will wait until there is enough memory. I put a commit for this idea here: https://github.com/zsxwing/spark/commit/c998856cdf747aa0452d030e58c3c2dd4ef7f97d Best Regards, Shixiong Zhu 2014-11-21 12:28 GMT+08:00 Reynold Xin : > Can you elaborate? Not 100% sure if I understand what you mean. > > On Thu, Nov 20, 2014 at 7:14 PM, Shixiong Zhu wrote: > >> Is it possible that Spark buffers the messages >> of mapOutputStatuses(Array[Byte]) according to the size >> of mapOutputStatuses which have already sent but not yet ACKed? The buffer >> will be cheap since the mapOutputStatuses messages are same and the memory >> cost is only a few pointers. >> >> Best Regards, >> Shixiong Zhu >> >> 2014-09-20 16:24 GMT+08:00 Reynold Xin : >> >>> BTW - a partial solution here: https://github.com/apache/spark/pull/2470 >>> >>> This doesn't address the 0 size block problem yet, but makes my large job >>> on hundreds of terabytes of data much more reliable. >>> >>> >>> On Fri, Jul 4, 2014 at 2:28 AM, Mridul Muralidharan >>> wrote: >>> >>> > In our clusters, number of containers we can get is high but memory >>> > per container is low : which is why avg_nodes_not_hosting data is >>> > rarely zero for ML tasks :-) >>> > >>> > To update - to unblock our current implementation efforts, we went >>> > with broadcast - since it is intutively easier and minimal change; and >>> > compress the array as bytes in TaskResult. >>> > This is then stored in disk backed maps - to remove memory pressure on >>> > master and workers (else MapOutputTracker becomes a memory hog). >>> > >>> > But I agree, compressed bitmap to represent 'large' blocks (anything >>> > larger that maxBytesInFlight actually) and probably existing to track >>> > non zero should be fine (we should not really track zero output for >>> > reducer - just waste of space). >>> > >>> > >>> > Regards, >>> > Mridul >>> > >>> > On Fri, Jul 4, 2014 at 3:43 AM, Reynold Xin >>> wrote: >>> > > Note that in my original proposal, I was suggesting we could track >>> > whether >>> > > block size = 0 using a compressed bitmap. That way we can still avoid >>> > > requests for zero-sized blocks. >>> > > >>> > > >>> > > >>> > > On Thu, Jul 3, 2014 at 3:12 PM, Reynold Xin >>> wrote: >>> > > >>> > >> Yes, that number is likely == 0 in any real workload ... >>> > >> >>> > >> >>> > >> On Thu, Jul 3, 2014 at 8:01 AM, Mridul Muralidharan < >>> mri...@gmail.com> >>> > >> wrote: >>> > >> >>> > >>> On Thu, Jul 3, 2014 at 11:32 AM, Reynold Xin >>> > wrote: >>> > >>> > On Wed, Jul 2, 2014 at 3:44 AM, Mridul Muralidharan < >>> > mri...@gmail.com> >>> > >>> > wrote: >>> > >>> > >>> > >>> >> >>> > >>> >> > >>> > >>> >> > The other thing we do need is the location of blocks. This is >>> > >>> actually >>> > >>> >> just >>> > >>> >> > O(n) because we just need to know where the map was run. >>> > >>> >> >>> > >>> >> For well partitioned data, wont this not involve a lot of >>> unwanted >>> > >>> >> requests to nodes which are not hosting data for a reducer (and >>> lack >>> > >>> >> of ability to throttle). >>> > >>> >> >>> > >>> > >>> > >>> > Was that a question? (I'm guessing it is). What do you mean >>> exactly? >>> > >>> >>> > >>> >>> > >>> I was not sure if I understood the proposal correctly - hence the >>> > >>> query : if I understood it right - the number of wasted requests >>> goes >>> > >>> up by num_reducers * avg_nodes_not_hosting data. >>> > >>> >>> > >>> Ofcourse, if avg_nodes_not_hosting data == 0, then we are fine ! >>> > >>> >>> > >>> Regards, >>> > >>> Mridul >>> > >>> >>> > >> >>> > >> >>> > >>> >> >> >
Re: Eliminate copy while sending data : any Akka experts here ?
Can you elaborate? Not 100% sure if I understand what you mean. On Thu, Nov 20, 2014 at 7:14 PM, Shixiong Zhu wrote: > Is it possible that Spark buffers the messages > of mapOutputStatuses(Array[Byte]) according to the size > of mapOutputStatuses which have already sent but not yet ACKed? The buffer > will be cheap since the mapOutputStatuses messages are same and the memory > cost is only a few pointers. > > Best Regards, > Shixiong Zhu > > 2014-09-20 16:24 GMT+08:00 Reynold Xin : > >> BTW - a partial solution here: https://github.com/apache/spark/pull/2470 >> >> This doesn't address the 0 size block problem yet, but makes my large job >> on hundreds of terabytes of data much more reliable. >> >> >> On Fri, Jul 4, 2014 at 2:28 AM, Mridul Muralidharan >> wrote: >> >> > In our clusters, number of containers we can get is high but memory >> > per container is low : which is why avg_nodes_not_hosting data is >> > rarely zero for ML tasks :-) >> > >> > To update - to unblock our current implementation efforts, we went >> > with broadcast - since it is intutively easier and minimal change; and >> > compress the array as bytes in TaskResult. >> > This is then stored in disk backed maps - to remove memory pressure on >> > master and workers (else MapOutputTracker becomes a memory hog). >> > >> > But I agree, compressed bitmap to represent 'large' blocks (anything >> > larger that maxBytesInFlight actually) and probably existing to track >> > non zero should be fine (we should not really track zero output for >> > reducer - just waste of space). >> > >> > >> > Regards, >> > Mridul >> > >> > On Fri, Jul 4, 2014 at 3:43 AM, Reynold Xin >> wrote: >> > > Note that in my original proposal, I was suggesting we could track >> > whether >> > > block size = 0 using a compressed bitmap. That way we can still avoid >> > > requests for zero-sized blocks. >> > > >> > > >> > > >> > > On Thu, Jul 3, 2014 at 3:12 PM, Reynold Xin >> wrote: >> > > >> > >> Yes, that number is likely == 0 in any real workload ... >> > >> >> > >> >> > >> On Thu, Jul 3, 2014 at 8:01 AM, Mridul Muralidharan < >> mri...@gmail.com> >> > >> wrote: >> > >> >> > >>> On Thu, Jul 3, 2014 at 11:32 AM, Reynold Xin >> > wrote: >> > >>> > On Wed, Jul 2, 2014 at 3:44 AM, Mridul Muralidharan < >> > mri...@gmail.com> >> > >>> > wrote: >> > >>> > >> > >>> >> >> > >>> >> > >> > >>> >> > The other thing we do need is the location of blocks. This is >> > >>> actually >> > >>> >> just >> > >>> >> > O(n) because we just need to know where the map was run. >> > >>> >> >> > >>> >> For well partitioned data, wont this not involve a lot of >> unwanted >> > >>> >> requests to nodes which are not hosting data for a reducer (and >> lack >> > >>> >> of ability to throttle). >> > >>> >> >> > >>> > >> > >>> > Was that a question? (I'm guessing it is). What do you mean >> exactly? >> > >>> >> > >>> >> > >>> I was not sure if I understood the proposal correctly - hence the >> > >>> query : if I understood it right - the number of wasted requests >> goes >> > >>> up by num_reducers * avg_nodes_not_hosting data. >> > >>> >> > >>> Ofcourse, if avg_nodes_not_hosting data == 0, then we are fine ! >> > >>> >> > >>> Regards, >> > >>> Mridul >> > >>> >> > >> >> > >> >> > >> > >
Re: Eliminate copy while sending data : any Akka experts here ?
Is it possible that Spark buffers the messages of mapOutputStatuses(Array[Byte]) according to the size of mapOutputStatuses which have already sent but not yet ACKed? The buffer will be cheap since the mapOutputStatuses messages are same and the memory cost is only a few pointers. Best Regards, Shixiong Zhu 2014-09-20 16:24 GMT+08:00 Reynold Xin : > BTW - a partial solution here: https://github.com/apache/spark/pull/2470 > > This doesn't address the 0 size block problem yet, but makes my large job > on hundreds of terabytes of data much more reliable. > > > On Fri, Jul 4, 2014 at 2:28 AM, Mridul Muralidharan > wrote: > > > In our clusters, number of containers we can get is high but memory > > per container is low : which is why avg_nodes_not_hosting data is > > rarely zero for ML tasks :-) > > > > To update - to unblock our current implementation efforts, we went > > with broadcast - since it is intutively easier and minimal change; and > > compress the array as bytes in TaskResult. > > This is then stored in disk backed maps - to remove memory pressure on > > master and workers (else MapOutputTracker becomes a memory hog). > > > > But I agree, compressed bitmap to represent 'large' blocks (anything > > larger that maxBytesInFlight actually) and probably existing to track > > non zero should be fine (we should not really track zero output for > > reducer - just waste of space). > > > > > > Regards, > > Mridul > > > > On Fri, Jul 4, 2014 at 3:43 AM, Reynold Xin wrote: > > > Note that in my original proposal, I was suggesting we could track > > whether > > > block size = 0 using a compressed bitmap. That way we can still avoid > > > requests for zero-sized blocks. > > > > > > > > > > > > On Thu, Jul 3, 2014 at 3:12 PM, Reynold Xin > wrote: > > > > > >> Yes, that number is likely == 0 in any real workload ... > > >> > > >> > > >> On Thu, Jul 3, 2014 at 8:01 AM, Mridul Muralidharan > > > >> wrote: > > >> > > >>> On Thu, Jul 3, 2014 at 11:32 AM, Reynold Xin > > wrote: > > >>> > On Wed, Jul 2, 2014 at 3:44 AM, Mridul Muralidharan < > > mri...@gmail.com> > > >>> > wrote: > > >>> > > > >>> >> > > >>> >> > > > >>> >> > The other thing we do need is the location of blocks. This is > > >>> actually > > >>> >> just > > >>> >> > O(n) because we just need to know where the map was run. > > >>> >> > > >>> >> For well partitioned data, wont this not involve a lot of unwanted > > >>> >> requests to nodes which are not hosting data for a reducer (and > lack > > >>> >> of ability to throttle). > > >>> >> > > >>> > > > >>> > Was that a question? (I'm guessing it is). What do you mean > exactly? > > >>> > > >>> > > >>> I was not sure if I understood the proposal correctly - hence the > > >>> query : if I understood it right - the number of wasted requests goes > > >>> up by num_reducers * avg_nodes_not_hosting data. > > >>> > > >>> Ofcourse, if avg_nodes_not_hosting data == 0, then we are fine ! > > >>> > > >>> Regards, > > >>> Mridul > > >>> > > >> > > >> > > >
Re: Eliminate copy while sending data : any Akka experts here ?
BTW - a partial solution here: https://github.com/apache/spark/pull/2470 This doesn't address the 0 size block problem yet, but makes my large job on hundreds of terabytes of data much more reliable. On Fri, Jul 4, 2014 at 2:28 AM, Mridul Muralidharan wrote: > In our clusters, number of containers we can get is high but memory > per container is low : which is why avg_nodes_not_hosting data is > rarely zero for ML tasks :-) > > To update - to unblock our current implementation efforts, we went > with broadcast - since it is intutively easier and minimal change; and > compress the array as bytes in TaskResult. > This is then stored in disk backed maps - to remove memory pressure on > master and workers (else MapOutputTracker becomes a memory hog). > > But I agree, compressed bitmap to represent 'large' blocks (anything > larger that maxBytesInFlight actually) and probably existing to track > non zero should be fine (we should not really track zero output for > reducer - just waste of space). > > > Regards, > Mridul > > On Fri, Jul 4, 2014 at 3:43 AM, Reynold Xin wrote: > > Note that in my original proposal, I was suggesting we could track > whether > > block size = 0 using a compressed bitmap. That way we can still avoid > > requests for zero-sized blocks. > > > > > > > > On Thu, Jul 3, 2014 at 3:12 PM, Reynold Xin wrote: > > > >> Yes, that number is likely == 0 in any real workload ... > >> > >> > >> On Thu, Jul 3, 2014 at 8:01 AM, Mridul Muralidharan > >> wrote: > >> > >>> On Thu, Jul 3, 2014 at 11:32 AM, Reynold Xin > wrote: > >>> > On Wed, Jul 2, 2014 at 3:44 AM, Mridul Muralidharan < > mri...@gmail.com> > >>> > wrote: > >>> > > >>> >> > >>> >> > > >>> >> > The other thing we do need is the location of blocks. This is > >>> actually > >>> >> just > >>> >> > O(n) because we just need to know where the map was run. > >>> >> > >>> >> For well partitioned data, wont this not involve a lot of unwanted > >>> >> requests to nodes which are not hosting data for a reducer (and lack > >>> >> of ability to throttle). > >>> >> > >>> > > >>> > Was that a question? (I'm guessing it is). What do you mean exactly? > >>> > >>> > >>> I was not sure if I understood the proposal correctly - hence the > >>> query : if I understood it right - the number of wasted requests goes > >>> up by num_reducers * avg_nodes_not_hosting data. > >>> > >>> Ofcourse, if avg_nodes_not_hosting data == 0, then we are fine ! > >>> > >>> Regards, > >>> Mridul > >>> > >> > >> >
Re: Eliminate copy while sending data : any Akka experts here ?
In our clusters, number of containers we can get is high but memory per container is low : which is why avg_nodes_not_hosting data is rarely zero for ML tasks :-) To update - to unblock our current implementation efforts, we went with broadcast - since it is intutively easier and minimal change; and compress the array as bytes in TaskResult. This is then stored in disk backed maps - to remove memory pressure on master and workers (else MapOutputTracker becomes a memory hog). But I agree, compressed bitmap to represent 'large' blocks (anything larger that maxBytesInFlight actually) and probably existing to track non zero should be fine (we should not really track zero output for reducer - just waste of space). Regards, Mridul On Fri, Jul 4, 2014 at 3:43 AM, Reynold Xin wrote: > Note that in my original proposal, I was suggesting we could track whether > block size = 0 using a compressed bitmap. That way we can still avoid > requests for zero-sized blocks. > > > > On Thu, Jul 3, 2014 at 3:12 PM, Reynold Xin wrote: > >> Yes, that number is likely == 0 in any real workload ... >> >> >> On Thu, Jul 3, 2014 at 8:01 AM, Mridul Muralidharan >> wrote: >> >>> On Thu, Jul 3, 2014 at 11:32 AM, Reynold Xin wrote: >>> > On Wed, Jul 2, 2014 at 3:44 AM, Mridul Muralidharan >>> > wrote: >>> > >>> >> >>> >> > >>> >> > The other thing we do need is the location of blocks. This is >>> actually >>> >> just >>> >> > O(n) because we just need to know where the map was run. >>> >> >>> >> For well partitioned data, wont this not involve a lot of unwanted >>> >> requests to nodes which are not hosting data for a reducer (and lack >>> >> of ability to throttle). >>> >> >>> > >>> > Was that a question? (I'm guessing it is). What do you mean exactly? >>> >>> >>> I was not sure if I understood the proposal correctly - hence the >>> query : if I understood it right - the number of wasted requests goes >>> up by num_reducers * avg_nodes_not_hosting data. >>> >>> Ofcourse, if avg_nodes_not_hosting data == 0, then we are fine ! >>> >>> Regards, >>> Mridul >>> >> >>
Re: Eliminate copy while sending data : any Akka experts here ?
Yes, that number is likely == 0 in any real workload ... On Thu, Jul 3, 2014 at 8:01 AM, Mridul Muralidharan wrote: > On Thu, Jul 3, 2014 at 11:32 AM, Reynold Xin wrote: > > On Wed, Jul 2, 2014 at 3:44 AM, Mridul Muralidharan > > wrote: > > > >> > >> > > >> > The other thing we do need is the location of blocks. This is actually > >> just > >> > O(n) because we just need to know where the map was run. > >> > >> For well partitioned data, wont this not involve a lot of unwanted > >> requests to nodes which are not hosting data for a reducer (and lack > >> of ability to throttle). > >> > > > > Was that a question? (I'm guessing it is). What do you mean exactly? > > > I was not sure if I understood the proposal correctly - hence the > query : if I understood it right - the number of wasted requests goes > up by num_reducers * avg_nodes_not_hosting data. > > Ofcourse, if avg_nodes_not_hosting data == 0, then we are fine ! > > Regards, > Mridul >
Re: Eliminate copy while sending data : any Akka experts here ?
Note that in my original proposal, I was suggesting we could track whether block size = 0 using a compressed bitmap. That way we can still avoid requests for zero-sized blocks. On Thu, Jul 3, 2014 at 3:12 PM, Reynold Xin wrote: > Yes, that number is likely == 0 in any real workload ... > > > On Thu, Jul 3, 2014 at 8:01 AM, Mridul Muralidharan > wrote: > >> On Thu, Jul 3, 2014 at 11:32 AM, Reynold Xin wrote: >> > On Wed, Jul 2, 2014 at 3:44 AM, Mridul Muralidharan >> > wrote: >> > >> >> >> >> > >> >> > The other thing we do need is the location of blocks. This is >> actually >> >> just >> >> > O(n) because we just need to know where the map was run. >> >> >> >> For well partitioned data, wont this not involve a lot of unwanted >> >> requests to nodes which are not hosting data for a reducer (and lack >> >> of ability to throttle). >> >> >> > >> > Was that a question? (I'm guessing it is). What do you mean exactly? >> >> >> I was not sure if I understood the proposal correctly - hence the >> query : if I understood it right - the number of wasted requests goes >> up by num_reducers * avg_nodes_not_hosting data. >> >> Ofcourse, if avg_nodes_not_hosting data == 0, then we are fine ! >> >> Regards, >> Mridul >> > >
Re: Eliminate copy while sending data : any Akka experts here ?
On Thu, Jul 3, 2014 at 11:32 AM, Reynold Xin wrote: > On Wed, Jul 2, 2014 at 3:44 AM, Mridul Muralidharan > wrote: > >> >> > >> > The other thing we do need is the location of blocks. This is actually >> just >> > O(n) because we just need to know where the map was run. >> >> For well partitioned data, wont this not involve a lot of unwanted >> requests to nodes which are not hosting data for a reducer (and lack >> of ability to throttle). >> > > Was that a question? (I'm guessing it is). What do you mean exactly? I was not sure if I understood the proposal correctly - hence the query : if I understood it right - the number of wasted requests goes up by num_reducers * avg_nodes_not_hosting data. Ofcourse, if avg_nodes_not_hosting data == 0, then we are fine ! Regards, Mridul
Re: Eliminate copy while sending data : any Akka experts here ?
On Wed, Jul 2, 2014 at 3:44 AM, Mridul Muralidharan wrote: > > > > > The other thing we do need is the location of blocks. This is actually > just > > O(n) because we just need to know where the map was run. > > For well partitioned data, wont this not involve a lot of unwanted > requests to nodes which are not hosting data for a reducer (and lack > of ability to throttle). > Was that a question? (I'm guessing it is). What do you mean exactly?
Re: Eliminate copy while sending data : any Akka experts here ?
Hi Reynold, Please see inline. Regards, Mridul On Wed, Jul 2, 2014 at 10:57 AM, Reynold Xin wrote: > I was actually talking to tgraves today at the summit about this. > > Based on my understanding, the sizes we track and send (which is > unfortunately O(M*R) regardless of how we change the implementation -- > whether we send via task or send via MapOutputTracker) is only used to > compute maxBytesInFlight so we can throttle the fetching speed to not > result in oom. Perhaps for very large shuffles, we don't need to send the > bytes for each block, and we can send whether they are zero or not (which > can be tracked via a compressed bitmap that can be tiny). You are right, currently for large blocks, we just need to know where the block exists. I was not sure if there was any possible future extension on that - for this reason, in order to preserve functionality, we moved to using Short from Byte for MapOutputTracker.compressedSize (to ensure large sizes can be represented with 0.7% error). Within a MapStatus, we moved to holding compressed data to save on space within master/workers (particularly for large number of reducers). If we do not anticipate any other reason for "size", we can move back to using Byte instead of Short to compress size (which will reduce required space by some factor less than 2) : since error in computed size for blocks larger than maxBytesInFlight does not really matter : we will split them into different FetchRequest's. > > The other thing we do need is the location of blocks. This is actually just > O(n) because we just need to know where the map was run. For well partitioned data, wont this not involve a lot of unwanted requests to nodes which are not hosting data for a reducer (and lack of ability to throttle). Regards, Mridul > > > On Tue, Jul 1, 2014 at 2:51 AM, Mridul Muralidharan > wrote: > >> We had considered both approaches (if I understood the suggestions right) : >> a) Pulling only map output states for tasks which run on the reducer >> by modifying the Actor. (Probably along lines of what Aaron described >> ?) >> The performance implication of this was bad : >> 1) We cant cache serialized result anymore, (caching it makes no sense >> rather). >> 2) The number requests to master will go from num_executors to >> num_reducers - the latter can be orders of magnitude higher than >> former. >> >> b) Instead of pulling this information, push it to executors as part >> of task submission. (What Patrick mentioned ?) >> (1) a.1 from above is still an issue for this. >> (2) Serialized task size is also a concern : we have already seen >> users hitting akka limits for task size - this will be an additional >> vector which might exacerbate it. >> Our jobs are not hitting this yet though ! >> >> I was hoping there might be something in akka itself to alleviate this >> - but if not, we can solve it within context of spark. >> >> Currently, we have worked around it by using broadcast variable when >> serialized size is above some threshold - so that our immediate >> concerns are unblocked :-) >> But a better solution should be greatly welcomed ! >> Maybe we can unify it with large serialized task as well ... >> >> >> Btw, I am not sure what the higher cost of BlockManager referred to is >> Aaron - do you mean the cost of persisting the serialized map outputs >> to disk ? >> >> >> >> >> Regards, >> Mridul >> >> >> On Tue, Jul 1, 2014 at 1:36 PM, Patrick Wendell >> wrote: >> > Yeah I created a JIRA a while back to piggy-back the map status info >> > on top of the task (I honestly think it will be a small change). There >> > isn't a good reason to broadcast the entire array and it can be an >> > issue during large shuffles. >> > >> > - Patrick >> > >> > On Mon, Jun 30, 2014 at 7:58 PM, Aaron Davidson >> wrote: >> >> I don't know of any way to avoid Akka doing a copy, but I would like to >> >> mention that it's on the priority list to piggy-back only the map >> statuses >> >> relevant to a particular map task on the task itself, thus reducing the >> >> total amount of data sent over the wire by a factor of N for N physical >> >> machines in your cluster. Ideally we would also avoid Akka entirely when >> >> sending the tasks, as these can get somewhat large and Akka doesn't work >> >> well with large messages. >> >> >> >> Do note that your solution of using broadcast to send the map tasks is >> very >> >> similar to how the executor returns the result of a task when it's too >> big >> >> for akka. We were thinking of refactoring this too, as using the block >> >> manager has much higher latency than a direct TCP send. >> >> >> >> >> >> On Mon, Jun 30, 2014 at 12:13 PM, Mridul Muralidharan > > >> >> wrote: >> >> >> >>> Our current hack is to use Broadcast variables when serialized >> >>> statuses are above some (configurable) size : and have the workers >> >>> directly pull them from master. >> >>> This is a workaround : so would be great if there was a >> >>> better/pri
Re: Eliminate copy while sending data : any Akka experts here ?
Hi Patrick, Please see inline. Regards, Mridul On Wed, Jul 2, 2014 at 10:52 AM, Patrick Wendell wrote: >> b) Instead of pulling this information, push it to executors as part >> of task submission. (What Patrick mentioned ?) >> (1) a.1 from above is still an issue for this. > > I don't understand problem a.1 is. In this case, we don't need to do > caching, right? To rephrase in this context, attempting to cache wont help since it is reducer specific and benefits are minimal (other than for reexecution for failures and speculative tasks). > >> (2) Serialized task size is also a concern : we have already seen >> users hitting akka limits for task size - this will be an additional >> vector which might exacerbate it. > > This would add only a small, constant amount of data to the task. It's > strictly better than before. Before if the map output status array was > size M x R, we send a single akka message to every node of size M x > R... this basically scales quadratically with the size of the RDD. The > new approach is constant... it's much better. And the total amount of > data send over the wire is likely much less. It would be a function of the number of mappers - and an overhead for each task. Regards, Mridul > > - Patrick
Re: Eliminate copy while sending data : any Akka experts here ?
I was actually talking to tgraves today at the summit about this. Based on my understanding, the sizes we track and send (which is unfortunately O(M*R) regardless of how we change the implementation -- whether we send via task or send via MapOutputTracker) is only used to compute maxBytesInFlight so we can throttle the fetching speed to not result in oom. Perhaps for very large shuffles, we don't need to send the bytes for each block, and we can send whether they are zero or not (which can be tracked via a compressed bitmap that can be tiny). The other thing we do need is the location of blocks. This is actually just O(n) because we just need to know where the map was run. On Tue, Jul 1, 2014 at 2:51 AM, Mridul Muralidharan wrote: > We had considered both approaches (if I understood the suggestions right) : > a) Pulling only map output states for tasks which run on the reducer > by modifying the Actor. (Probably along lines of what Aaron described > ?) > The performance implication of this was bad : > 1) We cant cache serialized result anymore, (caching it makes no sense > rather). > 2) The number requests to master will go from num_executors to > num_reducers - the latter can be orders of magnitude higher than > former. > > b) Instead of pulling this information, push it to executors as part > of task submission. (What Patrick mentioned ?) > (1) a.1 from above is still an issue for this. > (2) Serialized task size is also a concern : we have already seen > users hitting akka limits for task size - this will be an additional > vector which might exacerbate it. > Our jobs are not hitting this yet though ! > > I was hoping there might be something in akka itself to alleviate this > - but if not, we can solve it within context of spark. > > Currently, we have worked around it by using broadcast variable when > serialized size is above some threshold - so that our immediate > concerns are unblocked :-) > But a better solution should be greatly welcomed ! > Maybe we can unify it with large serialized task as well ... > > > Btw, I am not sure what the higher cost of BlockManager referred to is > Aaron - do you mean the cost of persisting the serialized map outputs > to disk ? > > > > > Regards, > Mridul > > > On Tue, Jul 1, 2014 at 1:36 PM, Patrick Wendell > wrote: > > Yeah I created a JIRA a while back to piggy-back the map status info > > on top of the task (I honestly think it will be a small change). There > > isn't a good reason to broadcast the entire array and it can be an > > issue during large shuffles. > > > > - Patrick > > > > On Mon, Jun 30, 2014 at 7:58 PM, Aaron Davidson > wrote: > >> I don't know of any way to avoid Akka doing a copy, but I would like to > >> mention that it's on the priority list to piggy-back only the map > statuses > >> relevant to a particular map task on the task itself, thus reducing the > >> total amount of data sent over the wire by a factor of N for N physical > >> machines in your cluster. Ideally we would also avoid Akka entirely when > >> sending the tasks, as these can get somewhat large and Akka doesn't work > >> well with large messages. > >> > >> Do note that your solution of using broadcast to send the map tasks is > very > >> similar to how the executor returns the result of a task when it's too > big > >> for akka. We were thinking of refactoring this too, as using the block > >> manager has much higher latency than a direct TCP send. > >> > >> > >> On Mon, Jun 30, 2014 at 12:13 PM, Mridul Muralidharan > > >> wrote: > >> > >>> Our current hack is to use Broadcast variables when serialized > >>> statuses are above some (configurable) size : and have the workers > >>> directly pull them from master. > >>> This is a workaround : so would be great if there was a > >>> better/principled solution. > >>> > >>> Please note that the responses are going to different workers > >>> requesting for the output statuses for shuffle (after map) - so not > >>> sure if back pressure buffers, etc would help. > >>> > >>> > >>> Regards, > >>> Mridul > >>> > >>> > >>> On Mon, Jun 30, 2014 at 11:07 PM, Mridul Muralidharan < > mri...@gmail.com> > >>> wrote: > >>> > Hi, > >>> > > >>> > While sending map output tracker result, the same serialized byte > >>> > array is sent multiple times - but the akka implementation copies it > >>> > to a private byte array within ByteString for each send. > >>> > Caching a ByteString instead of Array[Byte] did not help, since akka > >>> > does not support special casing ByteString : serializes the > >>> > ByteString, and copies the result out to an array before creating > >>> > ByteString out of it (in Array[Byte] serializing is thankfully simply > >>> > returning same array - so one copy only). > >>> > > >>> > > >>> > Given the need to send immutable data large number of times, is there > >>> > any way to do it in akka without copying internally in akka ? > >>> > > >>> > > >>> > To see how expensive it is, for 200 nodes withi large number of
Re: Eliminate copy while sending data : any Akka experts here ?
> b) Instead of pulling this information, push it to executors as part > of task submission. (What Patrick mentioned ?) > (1) a.1 from above is still an issue for this. I don't understand problem a.1 is. In this case, we don't need to do caching, right? > (2) Serialized task size is also a concern : we have already seen > users hitting akka limits for task size - this will be an additional > vector which might exacerbate it. This would add only a small, constant amount of data to the task. It's strictly better than before. Before if the map output status array was size M x R, we send a single akka message to every node of size M x R... this basically scales quadratically with the size of the RDD. The new approach is constant... it's much better. And the total amount of data send over the wire is likely much less. - Patrick
Re: Eliminate copy while sending data : any Akka experts here ?
We had considered both approaches (if I understood the suggestions right) : a) Pulling only map output states for tasks which run on the reducer by modifying the Actor. (Probably along lines of what Aaron described ?) The performance implication of this was bad : 1) We cant cache serialized result anymore, (caching it makes no sense rather). 2) The number requests to master will go from num_executors to num_reducers - the latter can be orders of magnitude higher than former. b) Instead of pulling this information, push it to executors as part of task submission. (What Patrick mentioned ?) (1) a.1 from above is still an issue for this. (2) Serialized task size is also a concern : we have already seen users hitting akka limits for task size - this will be an additional vector which might exacerbate it. Our jobs are not hitting this yet though ! I was hoping there might be something in akka itself to alleviate this - but if not, we can solve it within context of spark. Currently, we have worked around it by using broadcast variable when serialized size is above some threshold - so that our immediate concerns are unblocked :-) But a better solution should be greatly welcomed ! Maybe we can unify it with large serialized task as well ... Btw, I am not sure what the higher cost of BlockManager referred to is Aaron - do you mean the cost of persisting the serialized map outputs to disk ? Regards, Mridul On Tue, Jul 1, 2014 at 1:36 PM, Patrick Wendell wrote: > Yeah I created a JIRA a while back to piggy-back the map status info > on top of the task (I honestly think it will be a small change). There > isn't a good reason to broadcast the entire array and it can be an > issue during large shuffles. > > - Patrick > > On Mon, Jun 30, 2014 at 7:58 PM, Aaron Davidson wrote: >> I don't know of any way to avoid Akka doing a copy, but I would like to >> mention that it's on the priority list to piggy-back only the map statuses >> relevant to a particular map task on the task itself, thus reducing the >> total amount of data sent over the wire by a factor of N for N physical >> machines in your cluster. Ideally we would also avoid Akka entirely when >> sending the tasks, as these can get somewhat large and Akka doesn't work >> well with large messages. >> >> Do note that your solution of using broadcast to send the map tasks is very >> similar to how the executor returns the result of a task when it's too big >> for akka. We were thinking of refactoring this too, as using the block >> manager has much higher latency than a direct TCP send. >> >> >> On Mon, Jun 30, 2014 at 12:13 PM, Mridul Muralidharan >> wrote: >> >>> Our current hack is to use Broadcast variables when serialized >>> statuses are above some (configurable) size : and have the workers >>> directly pull them from master. >>> This is a workaround : so would be great if there was a >>> better/principled solution. >>> >>> Please note that the responses are going to different workers >>> requesting for the output statuses for shuffle (after map) - so not >>> sure if back pressure buffers, etc would help. >>> >>> >>> Regards, >>> Mridul >>> >>> >>> On Mon, Jun 30, 2014 at 11:07 PM, Mridul Muralidharan >>> wrote: >>> > Hi, >>> > >>> > While sending map output tracker result, the same serialized byte >>> > array is sent multiple times - but the akka implementation copies it >>> > to a private byte array within ByteString for each send. >>> > Caching a ByteString instead of Array[Byte] did not help, since akka >>> > does not support special casing ByteString : serializes the >>> > ByteString, and copies the result out to an array before creating >>> > ByteString out of it (in Array[Byte] serializing is thankfully simply >>> > returning same array - so one copy only). >>> > >>> > >>> > Given the need to send immutable data large number of times, is there >>> > any way to do it in akka without copying internally in akka ? >>> > >>> > >>> > To see how expensive it is, for 200 nodes withi large number of >>> > mappers and reducers, the status becomes something like 30 mb for us - >>> > and pulling this about 200 to 300 times results in OOM due to the >>> > large number of copies sent out. >>> > >>> > >>> > Thanks, >>> > Mridul >>>
Re: Eliminate copy while sending data : any Akka experts here ?
Yeah I created a JIRA a while back to piggy-back the map status info on top of the task (I honestly think it will be a small change). There isn't a good reason to broadcast the entire array and it can be an issue during large shuffles. - Patrick On Mon, Jun 30, 2014 at 7:58 PM, Aaron Davidson wrote: > I don't know of any way to avoid Akka doing a copy, but I would like to > mention that it's on the priority list to piggy-back only the map statuses > relevant to a particular map task on the task itself, thus reducing the > total amount of data sent over the wire by a factor of N for N physical > machines in your cluster. Ideally we would also avoid Akka entirely when > sending the tasks, as these can get somewhat large and Akka doesn't work > well with large messages. > > Do note that your solution of using broadcast to send the map tasks is very > similar to how the executor returns the result of a task when it's too big > for akka. We were thinking of refactoring this too, as using the block > manager has much higher latency than a direct TCP send. > > > On Mon, Jun 30, 2014 at 12:13 PM, Mridul Muralidharan > wrote: > >> Our current hack is to use Broadcast variables when serialized >> statuses are above some (configurable) size : and have the workers >> directly pull them from master. >> This is a workaround : so would be great if there was a >> better/principled solution. >> >> Please note that the responses are going to different workers >> requesting for the output statuses for shuffle (after map) - so not >> sure if back pressure buffers, etc would help. >> >> >> Regards, >> Mridul >> >> >> On Mon, Jun 30, 2014 at 11:07 PM, Mridul Muralidharan >> wrote: >> > Hi, >> > >> > While sending map output tracker result, the same serialized byte >> > array is sent multiple times - but the akka implementation copies it >> > to a private byte array within ByteString for each send. >> > Caching a ByteString instead of Array[Byte] did not help, since akka >> > does not support special casing ByteString : serializes the >> > ByteString, and copies the result out to an array before creating >> > ByteString out of it (in Array[Byte] serializing is thankfully simply >> > returning same array - so one copy only). >> > >> > >> > Given the need to send immutable data large number of times, is there >> > any way to do it in akka without copying internally in akka ? >> > >> > >> > To see how expensive it is, for 200 nodes withi large number of >> > mappers and reducers, the status becomes something like 30 mb for us - >> > and pulling this about 200 to 300 times results in OOM due to the >> > large number of copies sent out. >> > >> > >> > Thanks, >> > Mridul >>
Re: Eliminate copy while sending data : any Akka experts here ?
I don't know of any way to avoid Akka doing a copy, but I would like to mention that it's on the priority list to piggy-back only the map statuses relevant to a particular map task on the task itself, thus reducing the total amount of data sent over the wire by a factor of N for N physical machines in your cluster. Ideally we would also avoid Akka entirely when sending the tasks, as these can get somewhat large and Akka doesn't work well with large messages. Do note that your solution of using broadcast to send the map tasks is very similar to how the executor returns the result of a task when it's too big for akka. We were thinking of refactoring this too, as using the block manager has much higher latency than a direct TCP send. On Mon, Jun 30, 2014 at 12:13 PM, Mridul Muralidharan wrote: > Our current hack is to use Broadcast variables when serialized > statuses are above some (configurable) size : and have the workers > directly pull them from master. > This is a workaround : so would be great if there was a > better/principled solution. > > Please note that the responses are going to different workers > requesting for the output statuses for shuffle (after map) - so not > sure if back pressure buffers, etc would help. > > > Regards, > Mridul > > > On Mon, Jun 30, 2014 at 11:07 PM, Mridul Muralidharan > wrote: > > Hi, > > > > While sending map output tracker result, the same serialized byte > > array is sent multiple times - but the akka implementation copies it > > to a private byte array within ByteString for each send. > > Caching a ByteString instead of Array[Byte] did not help, since akka > > does not support special casing ByteString : serializes the > > ByteString, and copies the result out to an array before creating > > ByteString out of it (in Array[Byte] serializing is thankfully simply > > returning same array - so one copy only). > > > > > > Given the need to send immutable data large number of times, is there > > any way to do it in akka without copying internally in akka ? > > > > > > To see how expensive it is, for 200 nodes withi large number of > > mappers and reducers, the status becomes something like 30 mb for us - > > and pulling this about 200 to 300 times results in OOM due to the > > large number of copies sent out. > > > > > > Thanks, > > Mridul >
Re: Eliminate copy while sending data : any Akka experts here ?
Our current hack is to use Broadcast variables when serialized statuses are above some (configurable) size : and have the workers directly pull them from master. This is a workaround : so would be great if there was a better/principled solution. Please note that the responses are going to different workers requesting for the output statuses for shuffle (after map) - so not sure if back pressure buffers, etc would help. Regards, Mridul On Mon, Jun 30, 2014 at 11:07 PM, Mridul Muralidharan wrote: > Hi, > > While sending map output tracker result, the same serialized byte > array is sent multiple times - but the akka implementation copies it > to a private byte array within ByteString for each send. > Caching a ByteString instead of Array[Byte] did not help, since akka > does not support special casing ByteString : serializes the > ByteString, and copies the result out to an array before creating > ByteString out of it (in Array[Byte] serializing is thankfully simply > returning same array - so one copy only). > > > Given the need to send immutable data large number of times, is there > any way to do it in akka without copying internally in akka ? > > > To see how expensive it is, for 200 nodes withi large number of > mappers and reducers, the status becomes something like 30 mb for us - > and pulling this about 200 to 300 times results in OOM due to the > large number of copies sent out. > > > Thanks, > Mridul