[ https://issues.apache.org/jira/browse/PIG-4166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14174309#comment-14174309 ]
Daniel Dai commented on PIG-4166: --------------------------------- I unlink it from 0.14 since this sounds like a corner use case and no one seems actively working on it. Are you working on it, or do you have a test case, so I can take a look. > Collected group drops last record when combined with merge join > --------------------------------------------------------------- > > Key: PIG-4166 > URL: https://issues.apache.org/jira/browse/PIG-4166 > Project: Pig > Issue Type: Bug > Affects Versions: 0.12.0 > Reporter: Brian Johnson > Fix For: 0.15.0 > > > If the final two keys in each relation join, they will never make it to the > final output. The reason is that POMergeJoin does a read-ahead and > POCollectedGroup doesn't call processInput when this.parentPlan.endOfAllInput > == true. This prevents the final join from being output because POMergeJoin > never sees endOfAllInput == true. > {code} > diff --git > a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java > > b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java > index c355d1d..8fd44fa 100644 > --- > a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java > +++ > b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java > @@ -127,28 +127,30 @@ public class POCollectedGroup extends PhysicalOperator { > @Override > public Result getNextTuple() throws ExecException { > > - // Since the output is buffered, we need to flush the last > - // set of records when the close method is called by mapper. > - if (this.parentPlan.endOfAllInput) { > - if (outputBag != null) { > - Tuple tup = mTupleFactory.newTuple(2); > - tup.set(0, prevKey); > - tup.set(1, outputBag); > - outputBag = null; > - return new Result(POStatus.STATUS_OK, tup); > - } > - > - return new Result(POStatus.STATUS_EOP, null); > - } > + > > Result inp = null; > Result res = null; > > while (true) { > inp = processInput(); > + > if (inp.returnStatus == POStatus.STATUS_EOP || > inp.returnStatus == POStatus.STATUS_ERR) { > - break; > + // Since the output is buffered, we need to flush the last > + // set of records when the close method is called by mapper. > + if (this.parentPlan.endOfAllInput) { > + if (outputBag != null) { > + Tuple tup = mTupleFactory.newTuple(2); > + tup.set(0, prevKey); > + tup.set(1, outputBag); > + outputBag = null; > + return new Result(POStatus.STATUS_OK, tup); > + } > + > + return new Result(POStatus.STATUS_EOP, null); > + } else > + break; > } > > if (inp.returnStatus == POStatus.STATUS_NULL) { > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)