Support for > 2GB & PipelinedShuffle got added in 0.7.x. If you are using 0.5.3, 2 GB restriction was applicable to PipelinedSorter as well.
If you are using hive + tez, it is recommended to use hive's setting for container "hive.tez.container.size=8192" which internally sets the relevant tez settings. For tuning sort buffers, you can use "tez.runtime.io.sort.mb" ~Rajesh.B On Thu, Jun 4, 2015 at 5:59 AM, [email protected] <[email protected]> wrote: > How to enable PipedlinedShuffle and PipelinedSorter in tez 0.53 on hive > 1.1.0 ? > If I use pipelinedSort ,I should config (tez.task.resource.memory.mb > =8192 AND tez.runtime.io.sort.mb=2048 ) or config mapreduce.map.memory.mb > mapreduce.reduce.memory.mb mapreduce.task.io.sort.mb? > > > ------------------------------ > [email protected] > > > *From:* Rajesh Balamohan <[email protected]> > *Date:* 2015-06-03 18:20 > *To:* user <[email protected]> > *Subject:* Re: Re: What is the difference between PipelinedSorter and > DefaultSorter? > To add to the previous mail, > > Containers of size 8 GB is not uncommon these days; but with defaultsorter > we could allocate only < 2 GB sort buffer. For example, I ran a very small > scale terasort (40 GB) in smaller cluster & in a queue with limited > resources for testing purpose. > > Runtime (8 GB container, 20 mappers, 10 reducers, 1500 MB sort buffer, > DefaultSorter) : 278 seconds (198 seconds in map phase); basically every > mapper was spilling atleast once > Runtime (8 GB container, 20 mappers, 10 reducers, 3200 MB sort buffer, > PipelinedSorter) : 195 seconds (95 seconds in map phase) > > This is just a synthentic workload to show the kind of impact spill can > have on specific job's runtime. > > PipelinedSorter would be useful for skew as well; E.g, tpcds_query_17 > query @10 TB scale in hive used to generate huge amount of data in one of > the intermediate stages in earlier releases of hive. Providing more sort > buffer in such cases could bring down the spill cost considerably. > > Another reason for switching to PipelinedSorter is that, with > pipelinedsorter it would be possible to support the initial versions of > PipedlinedShuffle (i.e as and when a sortspan spills, downstream vertex can > be notified and the data can be consumed by downstream tasks.). This will > be useful when there is data skew and couple of mappers end up generating > huge amount of dataset. > > ~Rajesh.B > > On Wed, Jun 3, 2015 at 7:52 AM, [email protected] <[email protected]> > wrote: > >> Thank you! >> >> ------------------------------ >> [email protected] >> >> >> *From:* Rajesh Balamohan <[email protected]> >> *Date:* 2015-06-03 10:43 >> *To:* user <[email protected]> >> *Subject:* Re: What is the difference between PipelinedSorter and >> DefaultSorter? >> DefaultSorter is the same sorter implementation used in MapReduce world >> and is single threaded. PipelinedSorter on the other hand works based on >> divide/conquer approach and works on multiple sort-spans which can be >> sorted by different threads. More details can be found in >> http://people.apache.org/~gopalv/PipelinedSorter.pdf. >> >> It is not possible to increase sort.mb to greater than 2 GB with >> defaultsorter implementation. With pipelinedsorter, it is possible to >> allocate more than 2 GB as sort buffer. This could be useful in scenarios >> where you have large containers and can allocate more than 2 GB for sort >> buffer to avoid potential disk spills. It is possible to control the number >> of threads allocated for sorting in PipelinedSorter using >> "tez.runtime.pipelined.sorter.sort.threads" (defaults to 2). Setting this >> to lot higher value might not be useful as it depends on the number of >> processors available in the system and the number of containers running on >> the system. Depending on workloads, 2-4 could be a sweetspot. Starting Tez >> 0.7, PipelinedSorter has been made the defacto-sorter, though users can >> switch back to DefaultSorter (mapreduce world implementation) by setting >> "tez.runtime.sorter.class=LEGACY" >> >> ~Rajesh.B >> >> On Wed, Jun 3, 2015 at 7:18 AM, [email protected] <[email protected]> >> wrote: >> >>> In OrderedPartitionedKVOutput ,I see >>> if (this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS, >>> TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS_DEFAULT) > 1) { >>> sorter = new PipelinedSorter(getContext(), conf, >>> getNumPhysicalOutputs(), >>> memoryUpdateCallbackHandler.getMemoryAssigned()); >>> } else { >>> sorter = new DefaultSorter(getContext(), conf, >>> getNumPhysicalOutputs(), >>> memoryUpdateCallbackHandler.getMemoryAssigned()); >>> } >>> >>> When set tez.runtime.sort.threads >1 will choose PipelinedSorter . >>> ------------------------------ >>> [email protected] >>> >> >> >> >> -- >> ~Rajesh.B >> >> > > > -- > ~Rajesh.B > > -- ~Rajesh.B
