The approach is proving very bad in terms of performance as decompressing
and merging large amount of spills is taking a long time for 999
partitions. We have a join followed by UNION. Join vertex
uses UnorderedKVWriter for output as no sort is required to input to Union.
For merging of 8436 spills, it is taking 30 mins. I will go ahead and file
a jira.
2015-11-18 21:01:25,904 [INFO] [main] |resources.MemoryDistributor|:
InitialMemoryDistributor (isEnabled=true) invoked with: numInputs=2,
numOutputs=1, JVM.maxFree=3102212096,
allocatorClassName=org.apache.tez.runtime.library.resources.WeightedScalingMemoryDistributor
2015-11-18 21:01:26,295 [INFO] [TezChild]
|resources.MemoryDistributor|:
InitialRequests=[scope-6987:OUTPUT:104857600:org.apache.tez.runtime.library.output.UnorderedPartitionedKVOutput],
[scope-6974:INPUT:1861327360:org.apache.tez.runtime.library.input.OrderedGroupedKVInput],
[scope-6957:INPUT:1861327360:org.apache.tez.runtime.library.input.OrderedGroupedKVInput]
2015-11-18 21:01:26,303 [INFO] [TezChild]
|resources.WeightedScalingMemoryDistributor|:
ScaleRatiosUsed=[PARTITIONED_UNSORTED_OUTPUT:1][UNSORTED_OUTPUT:1][UNSORTED_INPUT:1][SORTED_OUTPUT:12][SORTED_MERGED_INPUT:12][PROCESSOR:1][OTHER:1]
2015-11-18 21:01:26,307 [INFO] [TezChild]
|resources.WeightedScalingMemoryDistributor|:
InitialReservationFraction=0.5,
AdditionalReservationFractionForIOs=0.045,
finalReserveFractionUsed=0.545
2015-11-18 21:01:26,308 [INFO] [TezChild]
|resources.WeightedScalingMemoryDistributor|: Scaling Requests.
NumRequests: 3, numScaledRequests: 25, TotalRequested: 3827512320,
TotalRequestedScaled: 1.7910685696E9, TotalJVMHeap: 3102212096,
TotalAvailable: 1411506503, TotalRequested/TotalJVMHeap:1.23
2015-11-18 21:01:26,308 [INFO] [TezChild]
|resources.MemoryDistributor|:
Allocations=[scope-6987:org.apache.tez.runtime.library.output.UnorderedPartitionedKVOutput:OUTPUT:104857600:3305449],
[scope-6974:org.apache.tez.runtime.library.input.OrderedGroupedKVInput:INPUT:1861327360:704100526],
[scope-6957:org.apache.tez.runtime.library.input.OrderedGroupedKVInput:INPUT:1861327360:704100526]
2015-11-18 21:02:49,010 [INFO] [TezChild]
|writers.UnorderedPartitionedKVWriter|: scope_6987: numBuffers=2,
sizePerBuffer=1652724, skipBuffers=false, pipelinedShuffle=false,
numPartitions=999
......
2015-11-18 21:21:03,353 [INFO] [UnorderedOutSpiller {scope_6987}]
|writers.UnorderedPartitionedKVWriter|: scope_6987: Finished spill
8436
2015-11-18 21:21:04,236 [INFO] [TezChild] |task.TezTaskRunner|:
Closing task, taskAttemptId=attempt_1444575566264_610936_1_28_000475_0
......
2015-11-18 21:21:04,238 [INFO] [TezChild]
|writers.UnorderedPartitionedKVWriter|: scope_6987: Waiting for all
spills to complete : Pending : 0
2015-11-18 21:21:04,238 [INFO] [TezChild]
|writers.UnorderedPartitionedKVWriter|: scope_6987: All spills
complete
2015-11-18 21:54:44,047 [INFO] [TezChild]
|writers.UnorderedPartitionedKVWriter|: scope_6987: Finished final
spill after merging : 8438 spills
....
On Wed, Nov 18, 2015 at 2:40 PM, Siddharth Seth <[email protected]> wrote:
> In case of UnorderedKVWriter (non-partitioned), a single file is used - to
> which new entries are appended.
>
> For the partitioned case - using a single file is not as straightforward,
> since the number of elements and size of each partition is not known up
> front. Generating a single file per partition can cause an explosion in the
> number of files, as well as the number of streams open in parallel (OOM).
> The current partitioned writer writes data into the in-memory buffer and
> then spills this into files with individual partitions consolidated
> together.
> Without pipelined shuffle - a single file needs to be generated for a
> single task, which is where the merge step comes in - in case the buffer is
> large. With pipelined shuffle - there's almost no extra cost, since there's
> no final merge - and each element is written out exactly once.
>
> That said, optimizations are possible depending upon the use case. e.g. For
> a small number of partitions - it's reasonable to write out a file per
> partition. However, the ShuffleHandle and shuffle code will need to change
> to handle this.
>
> Pipelined Shuffle/avoiding a final merge has some limitations in case of
> failures and partial chunks being transferred over. It should be possible
> to work around these by modifying the receiving side to process each input
> only when all data for that source has been received.
>
> Either way, non trivial changes are required to make this more efficient.
>
> In this particular case, how many partitions were generated, and what was
> the size of the unordered output buffer ? Increasing the buffer size for
> this particular job can help mitigating the problem - maybe not with 8500
> spills though.
>
>
>
> On Wed, Nov 18, 2015 at 1:32 PM, Rohini Palaniswamy <
> [email protected]
> > wrote:
>
> > Came across a job which was taking a long time in
> > UnorderedPartitionedKVWriter.mergeAll. Saw that it was decompressing and
> > reading data from spill files (8500 spills) and then writing the final
> > compressed merge file. Why do we need spill files for
> > UnorderedPartitionedKVWriter? Why not just buffer and keep directly
> writing
> > to the final file which will save a lot of time.
> >
> > Regards,
> > Rohini
> >
>