[ 
https://issues.apache.org/jira/browse/PIG-4166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Dai updated PIG-4166:
----------------------------
    Attachment: PIG-4166-2.patch

Attach an equivalent for trunk and 0.14 branch.

> Collected group drops last record when combined with merge join
> ---------------------------------------------------------------
>
>                 Key: PIG-4166
>                 URL: https://issues.apache.org/jira/browse/PIG-4166
>             Project: Pig
>          Issue Type: Bug
>    Affects Versions: 0.12.0
>            Reporter: Brian Johnson
>             Fix For: 0.15.0
>
>         Attachments: PIG-4166-2.patch, patch
>
>
> If the final two keys in each relation join, they will never make it to the 
> final output. The reason is that POMergeJoin does a read-ahead and 
> POCollectedGroup doesn't call processInput when this.parentPlan.endOfAllInput 
> == true. This prevents the final join from being output because POMergeJoin 
> never sees endOfAllInput == true.
> {code}
> diff --git 
> a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
>  
> b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
> index c355d1d..8fd44fa 100644
> --- 
> a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
> +++ 
> b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
> @@ -127,28 +127,30 @@ public class POCollectedGroup extends PhysicalOperator {
>      @Override
>      public Result getNextTuple() throws ExecException {
>  
> -        // Since the output is buffered, we need to flush the last
> -        // set of records when the close method is called by mapper.
> -        if (this.parentPlan.endOfAllInput) {
> -            if (outputBag != null) {
> -                Tuple tup = mTupleFactory.newTuple(2);
> -                tup.set(0, prevKey);
> -                tup.set(1, outputBag);
> -                outputBag = null;
> -                return new Result(POStatus.STATUS_OK, tup);
> -            }
> -
> -            return new Result(POStatus.STATUS_EOP, null);
> -        }
> +        
>  
>          Result inp = null;
>          Result res = null;
>  
>          while (true) {
>              inp = processInput();
> +
>              if (inp.returnStatus == POStatus.STATUS_EOP ||
>                      inp.returnStatus == POStatus.STATUS_ERR) {
> -                break;
> +               // Since the output is buffered, we need to flush the last
> +                // set of records when the close method is called by mapper.
> +                if (this.parentPlan.endOfAllInput) {
> +                    if (outputBag != null) {
> +                        Tuple tup = mTupleFactory.newTuple(2);
> +                        tup.set(0, prevKey);
> +                        tup.set(1, outputBag);
> +                        outputBag = null;
> +                        return new Result(POStatus.STATUS_OK, tup);
> +                    }
> +
> +                    return new Result(POStatus.STATUS_EOP, null);
> +                } else
> +                       break;
>              }
>  
>              if (inp.returnStatus == POStatus.STATUS_NULL) {
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to