[ 
https://issues.apache.org/jira/browse/PIG-4104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14094350#comment-14094350
 ] 

Rohini Palaniswamy commented on PIG-4104:
-----------------------------------------

Can't we call the accumulative function on the current key and set of values 
instead of adding to the buffer and iterating till all keys are done and 
calling pkgr.getNext(); in the end?  The he definition of Accumulator UDF is 
that the UDF will be called with a set of values for a key which is not 
supposed to be the full list of the values.  What I am thinking of is something 
like having AccumulatorEvalFunc.accumulate() from POUserFunc invoked for every 
iteration of while loop in POShuffleTezLoad with that key and value iterator. 
But have not looked at the accumulator implementation till now. So not sure if 
this is possible.

> Accumulator UDF throws OOM in Tez
> ---------------------------------
>
>                 Key: PIG-4104
>                 URL: https://issues.apache.org/jira/browse/PIG-4104
>             Project: Pig
>          Issue Type: Sub-task
>          Components: tez
>            Reporter: Cheolsoo Park
>            Assignee: Cheolsoo Park
>             Fix For: 0.14.0
>
>
> This is somewhat expected since we copy lots of object in POShuffleLoadTez 
> for accumulator UDF. With large data, it consistently fails with OOM. We need 
> to re-implement it.
> Here is an example stack trace-
> {code}
> 2014-08-02 02:59:15,801 ERROR [TezChild] 
> org.apache.tez.runtime.task.TezTaskRunner: Exception of type Error. Exiting 
> now
> java.lang.OutOfMemoryError: GC overhead limit exceeded
>     at java.lang.StringCoding$StringDecoder.decode(StringCoding.java:149)
>     at java.lang.StringCoding.decode(StringCoding.java:193)
>     at java.lang.String.<init>(String.java:416)
>     at 
> org.apache.pig.data.BinInterSedes$BinInterSedesTupleRawComparator.compareBinInterSedesDatum(BinInterSedes.java:964)
>     at 
> org.apache.pig.data.BinInterSedes$BinInterSedesTupleRawComparator.compareBinSedesTuple(BinInterSedes.java:770)
>     at 
> org.apache.pig.data.BinInterSedes$BinInterSedesTupleRawComparator.compare(BinInterSedes.java:728)
>     at 
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTupleSortComparator.compare(PigTupleSortComparator.java:100)
>     at 
> org.apache.tez.runtime.library.common.sort.impl.TezMerger$MergeQueue.lessThan(TezMerger.java:539)
>     at org.apache.hadoop.util.PriorityQueue.downHeap(PriorityQueue.java:144)
>     at org.apache.hadoop.util.PriorityQueue.adjustTop(PriorityQueue.java:108)
>     at 
> org.apache.tez.runtime.library.common.sort.impl.TezMerger$MergeQueue.adjustPriorityQueue(TezMerger.java:486)
>     at 
> org.apache.tez.runtime.library.common.sort.impl.TezMerger$MergeQueue.next(TezMerger.java:503)
>     at 
> org.apache.tez.runtime.library.common.ValuesIterator.readNextKey(ValuesIterator.java:179)
>     at 
> org.apache.tez.runtime.library.common.ValuesIterator.access$300(ValuesIterator.java:45)
>     at 
> org.apache.tez.runtime.library.common.ValuesIterator$1$1.next(ValuesIterator.java:138)
>     at 
> org.apache.pig.backend.hadoop.executionengine.tez.POShuffleTezLoad.getNextTuple(POShuffleTezLoad.java:176)
>     at 
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:301)
>     at 
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNextTuple(POForEach.java:242)
>     at 
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:301)
>     at 
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNextTuple(POForEach.java:242)
>     at 
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:301)
>     at 
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNextTuple(POForEach.java:242)
>     at 
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:301)
>     at 
> org.apache.pig.backend.hadoop.executionengine.tez.POStoreTez.getNextTuple(POStoreTez.java:113)
>     at 
> org.apache.pig.backend.hadoop.executionengine.tez.PigProcessor.runPipeline(PigProcessor.java:313)
>     at 
> org.apache.pig.backend.hadoop.executionengine.tez.PigProcessor.run(PigProcessor.java:196)
>     at 
> org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:324)
>     at 
> org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:180)
>     at 
> org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:172)
>     at java.security.AccessController.doPrivileged(Native Method) 
>     at javax.security.auth.Subject.doAs(Subject.java:415)
>     at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to