To add to the previous mail,

Containers of size 8 GB is not uncommon these days; but with defaultsorter
we could allocate only < 2 GB sort buffer. For example, I ran a very small
scale terasort (40 GB) in smaller cluster & in a queue with limited
resources for testing purpose.

Runtime (8 GB container, 20 mappers, 10 reducers, 1500 MB sort buffer,
DefaultSorter)   : 278 seconds (198 seconds in map phase); basically every
mapper was spilling atleast once
Runtime (8 GB container, 20 mappers, 10 reducers, 3200 MB sort buffer,
PipelinedSorter) : 195 seconds (95 seconds in map phase)

This is just a synthentic workload to show the kind of impact spill can
have on specific job's runtime.

PipelinedSorter would be useful for skew as well; E.g, tpcds_query_17 query
@10 TB scale in hive used to generate huge amount of data in one of the
intermediate stages in earlier releases of hive. Providing more sort buffer
in such cases could bring down the spill cost considerably.

Another reason for switching to PipelinedSorter is that, with
pipelinedsorter it would be possible to support the initial versions of
PipedlinedShuffle (i.e as and when a sortspan spills, downstream vertex can
be notified and the data can be consumed by downstream tasks.).  This will
be useful when there is data skew and couple of mappers end up generating
huge amount of dataset.

~Rajesh.B

On Wed, Jun 3, 2015 at 7:52 AM, [email protected] <[email protected]>
wrote:

> Thank you!
>
> ------------------------------
> [email protected]
>
>
> *From:* Rajesh Balamohan <[email protected]>
> *Date:* 2015-06-03 10:43
> *To:* user <[email protected]>
> *Subject:* Re: What is the difference between PipelinedSorter and
> DefaultSorter?
> DefaultSorter is the same sorter implementation used in MapReduce world
> and is single threaded.  PipelinedSorter on the other hand works based on
> divide/conquer approach and works on multiple sort-spans which can be
> sorted by different threads. More details can be found in
> http://people.apache.org/~gopalv/PipelinedSorter.pdf.
>
> It is not possible to increase sort.mb to greater than 2 GB with
> defaultsorter implementation. With pipelinedsorter, it is possible to
> allocate more than 2 GB as sort buffer. This could be useful in scenarios
> where you have large containers and can allocate more than 2 GB for sort
> buffer to avoid potential disk spills. It is possible to control the number
> of threads allocated for sorting in PipelinedSorter using
> "tez.runtime.pipelined.sorter.sort.threads" (defaults to 2). Setting this
> to lot higher value might not be useful as it depends on the number of
> processors available in the system and the number of containers running on
> the system.  Depending on workloads, 2-4 could be a sweetspot. Starting Tez
> 0.7, PipelinedSorter has been made the defacto-sorter, though users can
> switch back to DefaultSorter (mapreduce world implementation) by setting
> "tez.runtime.sorter.class=LEGACY"
>
> ~Rajesh.B
>
> On Wed, Jun 3, 2015 at 7:18 AM, [email protected] <[email protected]>
> wrote:
>
>> In OrderedPartitionedKVOutput ,I see
>> if (this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS,
>>     TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS_DEFAULT) > 1) {
>>    sorter = new PipelinedSorter(getContext(), conf,
>> getNumPhysicalOutputs(),
>>    memoryUpdateCallbackHandler.getMemoryAssigned());
>> } else {
>>     sorter = new DefaultSorter(getContext(), conf,
>> getNumPhysicalOutputs(),
>>    memoryUpdateCallbackHandler.getMemoryAssigned());
>> }
>>
>> When set  tez.runtime.sort.threads >1  will choose PipelinedSorter .
>> ------------------------------
>> [email protected]
>>
>
>
>
> --
> ~Rajesh.B
>
>


-- 
~Rajesh.B

Reply via email to