Answers inline
1) how to understand "pipelined shuffle"? Does it is becase the pipeline
sort? I find some comments about pipelined shuffle in
ShuffleSchaduler.copySucceeded(),but still cannot fully understand:
* In case of pipelined shuffle, it is quite possible that fetchers
pulled the FINAL_UPDATE spill in advance due to smaller output size. In
such scenarios, we need to wait until we retrieve all spill
* details to claim success.
Can you please explain the meaning more?
>> In case of ordered outputs, only when the data generation
(sort+spill+final merge etc) is complete, data would be available to
downstream vertex. With pipelined shuffle, data can be made available as an
when a sorted segment is available from PipelinedSorter. Also, it avoids
the expensive final merge operation in the producer side. To put it short,
as the data is being genreated in producer, data can made available to
downstream for downloading. And When overall data generation is over,
FINAL_UPDATE event is sent out.
Similar case with undered partitioned outputs, where data could be made
available only when entire data is written out. With PipelinedShuffle, as
and when the data for a partition is generated, event is sent out
downstream for consuming.
It is quite possible that the data generated in the final stage is quite
small as compared with the previous segments. However, in the downstream
multiple fetchers would be used for downloading the data. For example,
assume 5 events are generated (4 normal events + 1 FINAL_UPDATE event) and
10 threads are available in downstream. In this case, all segments can be
theoritically be downloaded in parallel and it is quite possible that data
pertaining to FINAL_UPDATE event could get downloaded faster (could be due
to lesser amount of data. e.g 1 MB as opposed to 1000 MB in other
segments). In such cases, fetchers have to wait until all segments are
downloaded (to prevent downstream from proceeding with partially downloaded
dataset which could lead to correctness issues).
2) Are there any other shuffle mode besides pipelined shuffle? the legacy
mapreduce shuffle? (I know that tez borrows much of the MR shuffle.)
>> You can explore more about ordered shuffle in
https://github.com/apache/tez/tree/dacd0191b684208d71ea457ca849f2d01212bb7e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped
and unordered shuffle in
https://github.com/apache/tez/tree/dacd0191b684208d71ea457ca849f2d01212bb7e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl
3) Where is the map output data stored? how to control its storage,Is there
any parameters for that?
>> Map output is stored in disk (one of the directories of
yarn.nodemanager.local-dirs).
https://github.com/apache/tez/blob/dacd0191b684208d71ea457ca849f2d01212bb7e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
4) If the map output stored in memory, how does custom vertex and tasks to
fetch them from memory? And if we do not re-use container,who manage map
outputs?
>> Map outputs are stored in disk. Higher level programs can choose to
cache using ObjectRegistry (e.g hashtable loading
https://github.com/apache/hive/blob/26b5c7b56a4f28ce3eabc0207566cce46b29b558/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java#L179).
In which case, when tasks are scheduled on the same container again, they
check if the data is locally present in memory and if not download from
remote again. In fetcher side, based on the amount of memory available,
downloaded data can be stored in memory/disk for processing.
5) Does one fetcher corresponds with one mapoutput? And a fetcher just
pull one-time of all the data produced by one map output?
>> Set of fetcher threads are allocated in downstream task and they can
download data from different tasks from the source vertex based on the
allocations from shuffle manager.
On Thu, Apr 14, 2016 at 9:58 PM, Maria <[email protected]> wrote:
>
> Hi, all:
> I have several questions about tez shuffle stage:
> 1) how to understand "pipelined shuffle"? Does it is becase the pipeline
> sort? I find some comments about pipelined shuffle in
> ShuffleSchaduler.copySucceeded(),but still cannot fully understand:
> * In case of pipelined shuffle, it is quite possible that fetchers
> pulled the FINAL_UPDATE spill in advance due to smaller output size. In
> such scenarios, we need to wait until we retrieve all spill
> * details to claim success.
> Can you please explain the meaning more?
>
> 2) Are there any other shuffle mode besides pipelined shuffle? the legacy
> mapreduce shuffle? (I know that tez borrows much of the MR shuffle.)
> 3) Where is the map output data stored? how to control its storage,Is
> there any parameters for that?
> 4) If the map output stored in memory, how does custom vertex and tasks to
> fetch them from memory? And if we do not re-use container,who manage map
> outputs?
> 5) Does one fetcher corresponds with one mapoutput? And a fetcher just
> pull one-time of all the data produced by one map output?
>
> Any reply will be much appreciated.
>
> Maria~.
--
~Rajesh.B