Filed https://issues.apache.org/jira/browse/TEZ-2950

On Wed, Nov 18, 2015 at 2:56 PM, Rohini Palaniswamy <[email protected]
> wrote:

> The approach is proving very bad in terms of performance as decompressing
> and merging large amount of spills is taking a long time for 999
> partitions. We have a join followed by UNION.  Join vertex
> uses UnorderedKVWriter for output as no sort is required to input to Union.
> For merging of 8436 spills, it is taking 30 mins. I will go ahead and file
> a jira.
>
> 2015-11-18 21:01:25,904 [INFO] [main] |resources.MemoryDistributor|: 
> InitialMemoryDistributor (isEnabled=true) invoked with: numInputs=2, 
> numOutputs=1, JVM.maxFree=3102212096, 
> allocatorClassName=org.apache.tez.runtime.library.resources.WeightedScalingMemoryDistributor
> 2015-11-18 21:01:26,295 [INFO] [TezChild] |resources.MemoryDistributor|: 
> InitialRequests=[scope-6987:OUTPUT:104857600:org.apache.tez.runtime.library.output.UnorderedPartitionedKVOutput],
>  
> [scope-6974:INPUT:1861327360:org.apache.tez.runtime.library.input.OrderedGroupedKVInput],
>  
> [scope-6957:INPUT:1861327360:org.apache.tez.runtime.library.input.OrderedGroupedKVInput]
> 2015-11-18 21:01:26,303 [INFO] [TezChild] 
> |resources.WeightedScalingMemoryDistributor|: 
> ScaleRatiosUsed=[PARTITIONED_UNSORTED_OUTPUT:1][UNSORTED_OUTPUT:1][UNSORTED_INPUT:1][SORTED_OUTPUT:12][SORTED_MERGED_INPUT:12][PROCESSOR:1][OTHER:1]
> 2015-11-18 21:01:26,307 [INFO] [TezChild] 
> |resources.WeightedScalingMemoryDistributor|: InitialReservationFraction=0.5, 
> AdditionalReservationFractionForIOs=0.045, finalReserveFractionUsed=0.545
> 2015-11-18 21:01:26,308 [INFO] [TezChild] 
> |resources.WeightedScalingMemoryDistributor|: Scaling Requests. NumRequests: 
> 3, numScaledRequests: 25, TotalRequested: 3827512320, TotalRequestedScaled: 
> 1.7910685696E9, TotalJVMHeap: 3102212096, TotalAvailable: 1411506503, 
> TotalRequested/TotalJVMHeap:1.23
> 2015-11-18 21:01:26,308 [INFO] [TezChild] |resources.MemoryDistributor|: 
> Allocations=[scope-6987:org.apache.tez.runtime.library.output.UnorderedPartitionedKVOutput:OUTPUT:104857600:3305449],
>  
> [scope-6974:org.apache.tez.runtime.library.input.OrderedGroupedKVInput:INPUT:1861327360:704100526],
>  
> [scope-6957:org.apache.tez.runtime.library.input.OrderedGroupedKVInput:INPUT:1861327360:704100526]
> 2015-11-18 21:02:49,010 [INFO] [TezChild] 
> |writers.UnorderedPartitionedKVWriter|: scope_6987: numBuffers=2, 
> sizePerBuffer=1652724, skipBuffers=false, pipelinedShuffle=false, 
> numPartitions=999
> ......
> 2015-11-18 21:21:03,353 [INFO] [UnorderedOutSpiller {scope_6987}] 
> |writers.UnorderedPartitionedKVWriter|: scope_6987: Finished spill 8436
> 2015-11-18 21:21:04,236 [INFO] [TezChild] |task.TezTaskRunner|: Closing task, 
> taskAttemptId=attempt_1444575566264_610936_1_28_000475_0
> ......
> 2015-11-18 21:21:04,238 [INFO] [TezChild] 
> |writers.UnorderedPartitionedKVWriter|: scope_6987: Waiting for all spills to 
> complete : Pending : 0
> 2015-11-18 21:21:04,238 [INFO] [TezChild] 
> |writers.UnorderedPartitionedKVWriter|: scope_6987: All spills complete
> 2015-11-18 21:54:44,047 [INFO] [TezChild] 
> |writers.UnorderedPartitionedKVWriter|: scope_6987: Finished final spill 
> after merging : 8438 spills
> ....
>
>
>
>
> On Wed, Nov 18, 2015 at 2:40 PM, Siddharth Seth <[email protected]> wrote:
>
>> In case of UnorderedKVWriter (non-partitioned), a single file is used - to
>> which new entries are appended.
>>
>> For the partitioned case - using a single file is not as straightforward,
>> since the number of elements and size of each partition is not known up
>> front. Generating a single file per partition can cause an explosion in
>> the
>> number of files, as well as the number of streams open in parallel (OOM).
>> The current partitioned writer writes data into the in-memory buffer and
>> then spills this into files with individual partitions consolidated
>> together.
>> Without pipelined shuffle - a single file needs to be generated for a
>> single task, which is where the merge step comes in - in case the buffer
>> is
>> large. With pipelined shuffle - there's almost no extra cost, since
>> there's
>> no final merge - and each element is written out exactly once.
>>
>> That said, optimizations are possible depending upon the use case. e.g.
>> For
>> a small number of partitions - it's reasonable to write out a file per
>> partition. However, the ShuffleHandle and shuffle code will need to change
>> to handle this.
>>
>> Pipelined Shuffle/avoiding a final merge has some limitations in case of
>> failures and partial chunks being transferred over. It should be possible
>> to work around these by modifying the receiving side to process each input
>> only when all data for that source has been received.
>>
>> Either way, non trivial changes are required to make this more efficient.
>>
>> In this particular case, how many partitions were generated, and what was
>> the size of the unordered output buffer ? Increasing the buffer size for
>> this particular job can help mitigating the problem - maybe not with 8500
>> spills though.
>>
>>
>>
>> On Wed, Nov 18, 2015 at 1:32 PM, Rohini Palaniswamy <
>> [email protected]
>> > wrote:
>>
>> >   Came across a job which was taking a long time in
>> > UnorderedPartitionedKVWriter.mergeAll. Saw that it was decompressing and
>> > reading data from spill files (8500 spills) and then writing the final
>> > compressed merge file. Why do we need spill files for
>> > UnorderedPartitionedKVWriter? Why not just buffer and keep directly
>> writing
>> > to the final file which will save a lot of time.
>> >
>> > Regards,
>> > Rohini
>> >
>>
>
>

Reply via email to