Filed https://issues.apache.org/jira/browse/TEZ-2950
On Wed, Nov 18, 2015 at 2:56 PM, Rohini Palaniswamy <[email protected] > wrote: > The approach is proving very bad in terms of performance as decompressing > and merging large amount of spills is taking a long time for 999 > partitions. We have a join followed by UNION. Join vertex > uses UnorderedKVWriter for output as no sort is required to input to Union. > For merging of 8436 spills, it is taking 30 mins. I will go ahead and file > a jira. > > 2015-11-18 21:01:25,904 [INFO] [main] |resources.MemoryDistributor|: > InitialMemoryDistributor (isEnabled=true) invoked with: numInputs=2, > numOutputs=1, JVM.maxFree=3102212096, > allocatorClassName=org.apache.tez.runtime.library.resources.WeightedScalingMemoryDistributor > 2015-11-18 21:01:26,295 [INFO] [TezChild] |resources.MemoryDistributor|: > InitialRequests=[scope-6987:OUTPUT:104857600:org.apache.tez.runtime.library.output.UnorderedPartitionedKVOutput], > > [scope-6974:INPUT:1861327360:org.apache.tez.runtime.library.input.OrderedGroupedKVInput], > > [scope-6957:INPUT:1861327360:org.apache.tez.runtime.library.input.OrderedGroupedKVInput] > 2015-11-18 21:01:26,303 [INFO] [TezChild] > |resources.WeightedScalingMemoryDistributor|: > ScaleRatiosUsed=[PARTITIONED_UNSORTED_OUTPUT:1][UNSORTED_OUTPUT:1][UNSORTED_INPUT:1][SORTED_OUTPUT:12][SORTED_MERGED_INPUT:12][PROCESSOR:1][OTHER:1] > 2015-11-18 21:01:26,307 [INFO] [TezChild] > |resources.WeightedScalingMemoryDistributor|: InitialReservationFraction=0.5, > AdditionalReservationFractionForIOs=0.045, finalReserveFractionUsed=0.545 > 2015-11-18 21:01:26,308 [INFO] [TezChild] > |resources.WeightedScalingMemoryDistributor|: Scaling Requests. NumRequests: > 3, numScaledRequests: 25, TotalRequested: 3827512320, TotalRequestedScaled: > 1.7910685696E9, TotalJVMHeap: 3102212096, TotalAvailable: 1411506503, > TotalRequested/TotalJVMHeap:1.23 > 2015-11-18 21:01:26,308 [INFO] [TezChild] |resources.MemoryDistributor|: > Allocations=[scope-6987:org.apache.tez.runtime.library.output.UnorderedPartitionedKVOutput:OUTPUT:104857600:3305449], > > [scope-6974:org.apache.tez.runtime.library.input.OrderedGroupedKVInput:INPUT:1861327360:704100526], > > [scope-6957:org.apache.tez.runtime.library.input.OrderedGroupedKVInput:INPUT:1861327360:704100526] > 2015-11-18 21:02:49,010 [INFO] [TezChild] > |writers.UnorderedPartitionedKVWriter|: scope_6987: numBuffers=2, > sizePerBuffer=1652724, skipBuffers=false, pipelinedShuffle=false, > numPartitions=999 > ...... > 2015-11-18 21:21:03,353 [INFO] [UnorderedOutSpiller {scope_6987}] > |writers.UnorderedPartitionedKVWriter|: scope_6987: Finished spill 8436 > 2015-11-18 21:21:04,236 [INFO] [TezChild] |task.TezTaskRunner|: Closing task, > taskAttemptId=attempt_1444575566264_610936_1_28_000475_0 > ...... > 2015-11-18 21:21:04,238 [INFO] [TezChild] > |writers.UnorderedPartitionedKVWriter|: scope_6987: Waiting for all spills to > complete : Pending : 0 > 2015-11-18 21:21:04,238 [INFO] [TezChild] > |writers.UnorderedPartitionedKVWriter|: scope_6987: All spills complete > 2015-11-18 21:54:44,047 [INFO] [TezChild] > |writers.UnorderedPartitionedKVWriter|: scope_6987: Finished final spill > after merging : 8438 spills > .... > > > > > On Wed, Nov 18, 2015 at 2:40 PM, Siddharth Seth <[email protected]> wrote: > >> In case of UnorderedKVWriter (non-partitioned), a single file is used - to >> which new entries are appended. >> >> For the partitioned case - using a single file is not as straightforward, >> since the number of elements and size of each partition is not known up >> front. Generating a single file per partition can cause an explosion in >> the >> number of files, as well as the number of streams open in parallel (OOM). >> The current partitioned writer writes data into the in-memory buffer and >> then spills this into files with individual partitions consolidated >> together. >> Without pipelined shuffle - a single file needs to be generated for a >> single task, which is where the merge step comes in - in case the buffer >> is >> large. With pipelined shuffle - there's almost no extra cost, since >> there's >> no final merge - and each element is written out exactly once. >> >> That said, optimizations are possible depending upon the use case. e.g. >> For >> a small number of partitions - it's reasonable to write out a file per >> partition. However, the ShuffleHandle and shuffle code will need to change >> to handle this. >> >> Pipelined Shuffle/avoiding a final merge has some limitations in case of >> failures and partial chunks being transferred over. It should be possible >> to work around these by modifying the receiving side to process each input >> only when all data for that source has been received. >> >> Either way, non trivial changes are required to make this more efficient. >> >> In this particular case, how many partitions were generated, and what was >> the size of the unordered output buffer ? Increasing the buffer size for >> this particular job can help mitigating the problem - maybe not with 8500 >> spills though. >> >> >> >> On Wed, Nov 18, 2015 at 1:32 PM, Rohini Palaniswamy < >> [email protected] >> > wrote: >> >> > Came across a job which was taking a long time in >> > UnorderedPartitionedKVWriter.mergeAll. Saw that it was decompressing and >> > reading data from spill files (8500 spills) and then writing the final >> > compressed merge file. Why do we need spill files for >> > UnorderedPartitionedKVWriter? Why not just buffer and keep directly >> writing >> > to the final file which will save a lot of time. >> > >> > Regards, >> > Rohini >> > >> > >
