I saw the hive-on-tez doc again(https://cwiki.apache.org/confluence/display/Hive/Hive+on+Tez). It said that : "Tez allows for small datasets to be handled entirely in memory, while no such optimization is available in map-reduce. Many warehousing queries sort or aggregate small datasets after the heavy lifting is done. These would benefit from an in memory shuffle." I was confused, does it means the map outputs stay in memory? while you said map outputs will finally flushed to local disk. Are the logics conflict?
Thankyou~ Maria. At 2016-04-15 15:49:08, "Maria" <[email protected]> wrote: > >Thank you so so much, Rajesh ~. >After read your explain for question 3) and 4), I have two more questions: >(1)Can tez map outputs'storage ideas be interpreted as MR's. There will be a >ring buffer to store map output first? >(2)For MRR, does the first reduce vertex's outputs stored in disk? I traced >logs,but can't diagnose. >(3)As https://tez.apache.org/index.html picture described, throughout the tez >DAG process, there are just a little data need write to disk,does this means >just write little data to HDFS? > >best wishes & thank you > >Maria~. > >At 2016-04-15 07:26:59, "Rajesh Balamohan" <[email protected]> wrote: > > > >Answers inline > > >1) how to understand "pipelined shuffle"? Does it is becase the pipeline sort? >I find some comments about pipelined shuffle in >ShuffleSchaduler.copySucceeded(),but still cannot fully understand: > * In case of pipelined shuffle, it is quite possible that fetchers >pulled the FINAL_UPDATE spill in advance due to smaller output size. In such >scenarios, we need to wait until we retrieve all spill > * details to claim success. >Can you please explain the meaning more? > > >>> In case of ordered outputs, only when the data generation >>> (sort+spill+final merge etc) is complete, data would be available to >>> downstream vertex. With pipelined shuffle, data can be made available as an >>> when a sorted segment is available from PipelinedSorter. Also, it avoids >>> the expensive final merge operation in the producer side. To put it short, >>> as the data is being genreated in producer, data can made available to >>> downstream for downloading. And When overall data generation is over, >>> FINAL_UPDATE event is sent out. > > >Similar case with undered partitioned outputs, where data could be made >available only when entire data is written out. With PipelinedShuffle, as and >when the data for a partition is generated, event is sent out downstream for >consuming. > > >It is quite possible that the data generated in the final stage is quite small >as compared with the previous segments. However, in the downstream multiple >fetchers would be used for downloading the data. For example, assume 5 events >are generated (4 normal events + 1 FINAL_UPDATE event) and 10 threads are >available in downstream. In this case, all segments can be theoritically be >downloaded in parallel and it is quite possible that data pertaining to >FINAL_UPDATE event could get downloaded faster (could be due to lesser amount >of data. e.g 1 MB as opposed to 1000 MB in other segments). In such cases, >fetchers have to wait until all segments are downloaded (to prevent downstream >from proceeding with partially downloaded dataset which could lead to >correctness issues). > > > > >2) Are there any other shuffle mode besides pipelined shuffle? the legacy >mapreduce shuffle? (I know that tez borrows much of the MR shuffle.) > > >>> You can explore more about ordered shuffle in >>> https://github.com/apache/tez/tree/dacd0191b684208d71ea457ca849f2d01212bb7e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped >>> and unordered shuffle in >>> https://github.com/apache/tez/tree/dacd0191b684208d71ea457ca849f2d01212bb7e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl > > >3) Where is the map output data stored? how to control its storage,Is there >any parameters for that? >>> Map output is stored in disk (one of the directories of >>> yarn.nodemanager.local-dirs). >>> https://github.com/apache/tez/blob/dacd0191b684208d71ea457ca849f2d01212bb7e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java > > >4) If the map output stored in memory, how does custom vertex and tasks to >fetch them from memory? And if we do not re-use container,who manage map >outputs? > > >>> Map outputs are stored in disk. Higher level programs can choose to cache >>> using ObjectRegistry (e.g hashtable loading >>> https://github.com/apache/hive/blob/26b5c7b56a4f28ce3eabc0207566cce46b29b558/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java#L179). >>> In which case, when tasks are scheduled on the same container again, they >>> check if the data is locally present in memory and if not download from >>> remote again. In fetcher side, based on the amount of memory available, >>> downloaded data can be stored in memory/disk for processing. > > > > >5) Does one fetcher corresponds with one mapoutput? And a fetcher just pull >one-time of all the data produced by one map output? > > >>> Set of fetcher threads are allocated in downstream task and they can >>> download data from different tasks from the source vertex based on the >>> allocations from shuffle manager. > > > > >On Thu, Apr 14, 2016 at 9:58 PM, Maria <[email protected]> wrote: > > > >Hi, all: > > I have several questions about tez shuffle stage: > >1) how to understand "pipelined shuffle"? Does it is becase the pipeline sort? >I find some comments about pipelined shuffle in >ShuffleSchaduler.copySucceeded(),but still cannot fully understand: > > * In case of pipelined shuffle, it is quite possible that fetchers >pulled the FINAL_UPDATE spill in advance due to smaller output size. In such >scenarios, we need to wait until we retrieve all spill > > * details to claim success. > >Can you please explain the meaning more? > > > >2) Are there any other shuffle mode besides pipelined shuffle? the legacy >mapreduce shuffle? (I know that tez borrows much of the MR shuffle.) > >3) Where is the map output data stored? how to control its storage,Is there >any parameters for that? > >4) If the map output stored in memory, how does custom vertex and tasks to >fetch them from memory? And if we do not re-use container,who manage map >outputs? > >5) Does one fetcher corresponds with one mapoutput? And a fetcher just pull >one-time of all the data produced by one map output? > > > >Any reply will be much appreciated. > > > >Maria~. > > > >-- > >~Rajesh.B >
