[
https://issues.apache.org/jira/browse/PIG-4104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14094350#comment-14094350
]
Rohini Palaniswamy commented on PIG-4104:
-----------------------------------------
Can't we call the accumulative function on the current key and set of values
instead of adding to the buffer and iterating till all keys are done and
calling pkgr.getNext(); in the end? The he definition of Accumulator UDF is
that the UDF will be called with a set of values for a key which is not
supposed to be the full list of the values. What I am thinking of is something
like having AccumulatorEvalFunc.accumulate() from POUserFunc invoked for every
iteration of while loop in POShuffleTezLoad with that key and value iterator.
But have not looked at the accumulator implementation till now. So not sure if
this is possible.
> Accumulator UDF throws OOM in Tez
> ---------------------------------
>
> Key: PIG-4104
> URL: https://issues.apache.org/jira/browse/PIG-4104
> Project: Pig
> Issue Type: Sub-task
> Components: tez
> Reporter: Cheolsoo Park
> Assignee: Cheolsoo Park
> Fix For: 0.14.0
>
>
> This is somewhat expected since we copy lots of object in POShuffleLoadTez
> for accumulator UDF. With large data, it consistently fails with OOM. We need
> to re-implement it.
> Here is an example stack trace-
> {code}
> 2014-08-02 02:59:15,801 ERROR [TezChild]
> org.apache.tez.runtime.task.TezTaskRunner: Exception of type Error. Exiting
> now
> java.lang.OutOfMemoryError: GC overhead limit exceeded
> at java.lang.StringCoding$StringDecoder.decode(StringCoding.java:149)
> at java.lang.StringCoding.decode(StringCoding.java:193)
> at java.lang.String.<init>(String.java:416)
> at
> org.apache.pig.data.BinInterSedes$BinInterSedesTupleRawComparator.compareBinInterSedesDatum(BinInterSedes.java:964)
> at
> org.apache.pig.data.BinInterSedes$BinInterSedesTupleRawComparator.compareBinSedesTuple(BinInterSedes.java:770)
> at
> org.apache.pig.data.BinInterSedes$BinInterSedesTupleRawComparator.compare(BinInterSedes.java:728)
> at
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTupleSortComparator.compare(PigTupleSortComparator.java:100)
> at
> org.apache.tez.runtime.library.common.sort.impl.TezMerger$MergeQueue.lessThan(TezMerger.java:539)
> at org.apache.hadoop.util.PriorityQueue.downHeap(PriorityQueue.java:144)
> at org.apache.hadoop.util.PriorityQueue.adjustTop(PriorityQueue.java:108)
> at
> org.apache.tez.runtime.library.common.sort.impl.TezMerger$MergeQueue.adjustPriorityQueue(TezMerger.java:486)
> at
> org.apache.tez.runtime.library.common.sort.impl.TezMerger$MergeQueue.next(TezMerger.java:503)
> at
> org.apache.tez.runtime.library.common.ValuesIterator.readNextKey(ValuesIterator.java:179)
> at
> org.apache.tez.runtime.library.common.ValuesIterator.access$300(ValuesIterator.java:45)
> at
> org.apache.tez.runtime.library.common.ValuesIterator$1$1.next(ValuesIterator.java:138)
> at
> org.apache.pig.backend.hadoop.executionengine.tez.POShuffleTezLoad.getNextTuple(POShuffleTezLoad.java:176)
> at
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:301)
> at
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNextTuple(POForEach.java:242)
> at
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:301)
> at
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNextTuple(POForEach.java:242)
> at
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:301)
> at
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNextTuple(POForEach.java:242)
> at
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:301)
> at
> org.apache.pig.backend.hadoop.executionengine.tez.POStoreTez.getNextTuple(POStoreTez.java:113)
> at
> org.apache.pig.backend.hadoop.executionengine.tez.PigProcessor.runPipeline(PigProcessor.java:313)
> at
> org.apache.pig.backend.hadoop.executionengine.tez.PigProcessor.run(PigProcessor.java:196)
> at
> org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:324)
> at
> org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:180)
> at
> org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:172)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:415)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
> {code}
--
This message was sent by Atlassian JIRA
(v6.2#6252)