[
https://issues.apache.org/jira/browse/TEZ-1157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14043046#comment-14043046
]
Gopal V edited comment on TEZ-1157 at 6/25/14 5:05 AM:
-------------------------------------------------------
Ignoring the HDFS thread above, the task needs to know 4 variables to determine
whether leader election within node might be a good idea.
* parallelism of the destination task
* parallelism of the source task
* size of the shuffle output
* number of partitions
The broadcast case is a special case where number of partitions is always 1
(partition 0 of the map output - sourceIndex=0), which is the only case where
the optimization applies unconditionally (the dynamically partitioned hash join
involves similar bottlenecks, but is part of Hive's custom plugins).
Pessimistically (i.e no container reuse), the IO load on each source task
should be (destination-count*shuffle-output-size).
The source task parallelism is clearly visible to the downstream task as the
number of total inputs to wait for.
The destination task parallelism can be communicated as part of the TaskSpec,
before starting the task.
The shuffle output size is the only data item to be sent over the wire as part
of the ShufflePayload message, so that it can be interpreted by the receiving
task to determine whether the contents should be shared or not.
The ideal scenario is for the data written out by a task to be in exactly the
format used by the shuffle handler (for future use) and the file space can be
reserved based on the input raw size provided in the dir-allocator.
was (Author: gopalv):
Ignoring the HDFS thread above, the task needs to know 3 variables to determine
whether leader election within node might be a good idea.
* parallelism of the destination task
* parallelism of the source task
* size of the shuffle output
* number of partitions
The broadcast case is a special case where number of partitions is always 1
(partition 0 of the map output - sourceIndex=0), which is the only case where
the optimization applies unconditionally (the dynamically partitioned hash join
involves similar bottlenecks, but is part of Hive's custom plugins).
Pessimistically (i.e no container reuse), the IO load on each source task
should be (destination-count*shuffle-output-size).
The source task parallelism is clearly visible to the downstream task as the
number of total inputs to wait for.
The destination task parallelism can be communicated as part of the TaskSpec,
before starting the task.
The shuffle output size is the only data item to be sent over the wire as part
of the ShufflePayload message, so that it can be interpreted by the receiving
task to determine whether the contents should be shared or not.
The ideal scenario is for the data written out by a task to be in exactly the
format used by the shuffle handler (for future use) and the file space can be
reserved based on the input raw size provided in the dir-allocator.
> Optimize broadcast :- Tasks pertaining to same job in same machine should not
> download multiple copies of broadcast data
> ------------------------------------------------------------------------------------------------------------------------
>
> Key: TEZ-1157
> URL: https://issues.apache.org/jira/browse/TEZ-1157
> Project: Apache Tez
> Issue Type: Sub-task
> Reporter: Rajesh Balamohan
> Assignee: Rajesh Balamohan
> Labels: performance
> Attachments: TEZ-1152.WIP.patch
>
>
> Currently tasks (belonging to same job) running in the same machine download
> its own copy of broadcast data. Optimization could be to download one copy
> in the machine, and the rest of the tasks can refer to this downloaded copy.
--
This message was sent by Atlassian JIRA
(v6.2#6252)