[ 
https://issues.apache.org/jira/browse/TEZ-1157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14043046#comment-14043046
 ] 

Gopal V edited comment on TEZ-1157 at 6/25/14 5:05 AM:
-------------------------------------------------------

Ignoring the HDFS thread above, the task needs to know 4 variables to determine 
whether leader election within node might be a good idea.

* parallelism of the destination task
* parallelism of the source task
* size of the shuffle output
* number of partitions 

The broadcast case is a special case where number of partitions is always 1 
(partition 0 of the map output - sourceIndex=0), which is the only case where 
the optimization applies unconditionally (the dynamically partitioned hash join 
involves similar bottlenecks, but is part of Hive's custom plugins).

Pessimistically (i.e no container reuse), the IO load on each source task 
should be (destination-count*shuffle-output-size). 

The source task parallelism is clearly visible to the downstream task as the 
number of total inputs to wait for.

The destination task parallelism can be communicated as part of the TaskSpec, 
before starting the task.

The shuffle output size is the only data item to be sent over the wire as part 
of the ShufflePayload message, so that it can be interpreted by the receiving 
task to determine whether the contents should be shared or not.

The ideal scenario is for the data written out by a task to be in exactly the 
format used by the shuffle handler (for future use) and the file space can be 
reserved based on the input raw size provided in the dir-allocator.


was (Author: gopalv):
Ignoring the HDFS thread above, the task needs to know 3 variables to determine 
whether leader election within node might be a good idea.

* parallelism of the destination task
* parallelism of the source task
* size of the shuffle output
* number of partitions 

The broadcast case is a special case where number of partitions is always 1 
(partition 0 of the map output - sourceIndex=0), which is the only case where 
the optimization applies unconditionally (the dynamically partitioned hash join 
involves similar bottlenecks, but is part of Hive's custom plugins).

Pessimistically (i.e no container reuse), the IO load on each source task 
should be (destination-count*shuffle-output-size). 

The source task parallelism is clearly visible to the downstream task as the 
number of total inputs to wait for.

The destination task parallelism can be communicated as part of the TaskSpec, 
before starting the task.

The shuffle output size is the only data item to be sent over the wire as part 
of the ShufflePayload message, so that it can be interpreted by the receiving 
task to determine whether the contents should be shared or not.

The ideal scenario is for the data written out by a task to be in exactly the 
format used by the shuffle handler (for future use) and the file space can be 
reserved based on the input raw size provided in the dir-allocator.

> Optimize broadcast :- Tasks pertaining to same job in same machine should not 
> download multiple copies of broadcast data
> ------------------------------------------------------------------------------------------------------------------------
>
>                 Key: TEZ-1157
>                 URL: https://issues.apache.org/jira/browse/TEZ-1157
>             Project: Apache Tez
>          Issue Type: Sub-task
>            Reporter: Rajesh Balamohan
>            Assignee: Rajesh Balamohan
>              Labels: performance
>         Attachments: TEZ-1152.WIP.patch
>
>
> Currently tasks (belonging to same job) running in the same machine download 
> its own copy of broadcast data.  Optimization could be to  download one copy 
> in the machine, and the rest of the tasks can refer to this downloaded copy.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to