[ 
https://issues.apache.org/jira/browse/PIG-4421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liyunzhang_intel updated PIG-4421:
----------------------------------
    Attachment: PIG-4421_5.patch

the difference between PIG-4421_4.patch and PIG-4421_5.patch:
fix the TestSkewedJoin.testSkewedJoinManyReducers unit test failure.

*why TestSkewedJoin.testSkewedJoinManyReducers fails in PIG-4421_4.patch?*

let me explain it with an example:
{code}
 bin/testSkewedJoinManyReducers.pig:
A = load './testSkewedJoin6.txt' as (id,name);
B = load './testSkewedJoin7.txt' as (id,name);
C = join A by id, B by id using 'skewed' parallel 300;
store C into './testSkewedJoinManyReducers.out';
{code}

{code}
cat bin/testSkewedJoin6.txt:
237     0
237     1
237     2
237     3
237     4
{code}

{code}
cat bin/testSkewedJoin7.txt:
237     0
237     1
{code}

The result of testSkewedJoinManyReducers.pig sometimes is:
{code}
237     0       237     0
237     0       237     1
237     1       237     0
237     1       237     1
237     2       237     0
237     2       237     1
237     3       237     0
237     3       237     1
237     4       237     0
237     4       237     1
{code}

Sometimes is:
{code}
237     0       237     0
237     0       237     1
237     0       237     2
237     0       237     3
237     0       237     4
237     1       237     0
237     1       237     1
237     1       237     2
237     1       237     3
237     1       237     4
{code}

*The first one is left join which is correct while the second one is right 
right which is not correct.* The reason causing this is because following code:
{code}
SparkLauncher.java#physicalToRDD
private void physicalToRDD(PhysicalPlan plan,
                        PhysicalOperator physicalOperator,
                        Map<OperatorKey, RDD<Tuple>> rdds,
                        List<RDD<Tuple>> rddsFromPredeSparkOper,
                        Map<Class<? extends PhysicalOperator>, POConverter> 
convertMap)
                        throws IOException {
                RDD<Tuple> nextRDD = null;
                List<PhysicalOperator> predecessors = plan
                                .getPredecessors(physicalOperator);  // the 
predecessors is not sorted by the OperatorKey, for example, 
physicalOperator(scope-9) and physicalOperator(scope-10) are both the 
predecessor of physicalOperator(scope-13). Sometimes the predecessors is 
[scope-9, scope-10], sometimes the predecessors is [scope-10, scope-9]. This 
will make the left join change to right join.
                …..
        }
{code}

the solution to resolve this is :
{code}
SparkLauncher.java#physicalToRDD
private void physicalToRDD(PhysicalPlan plan,
                        PhysicalOperator physicalOperator,
                        Map<OperatorKey, RDD<Tuple>> rdds,
                        List<RDD<Tuple>> rddsFromPredeSparkOper,
                        Map<Class<? extends PhysicalOperator>, POConverter> 
convertMap)
                        throws IOException {
                RDD<Tuple> nextRDD = null;
                List<PhysicalOperator> predecessors = plan
                                .getPredecessors(physicalOperator);  
             +  if( predecessors!= null ) {
            +    Collections.sort(predecessors);
            +}
                …..
        }
{code}



> implement visitSkewedJoin in SparkCompiler
> ------------------------------------------
>
>                 Key: PIG-4421
>                 URL: https://issues.apache.org/jira/browse/PIG-4421
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: liyunzhang_intel
>            Assignee: liyunzhang_intel
>             Fix For: spark-branch
>
>         Attachments: PIG-4421.patch, PIG-4421_2.patch, PIG-4421_3.patch, 
> PIG-4421_4.patch, PIG-4421_5.patch
>
>
> If visitSkewedJoin is not implemented, following unittests will fail.
> org.apache.pig.test.TestSkewedJoin.testSkewedJoinWithGroup
> org.apache.pig.test.TestSkewedJoin.testSkewedJoinMapKey
> org.apache.pig.test.TestSkewedJoin.testSkewedJoinManyReducers
> org.apache.pig.test.TestSkewedJoin.testNonExistingInputPathInSkewJoin
> org.apache.pig.test.TestSkewedJoin.testSkewedJoinOneValue
> org.apache.pig.test.TestSkewedJoin.testSkewedJoinWithNoProperties
> org.apache.pig.test.TestSkewedJoin.testSkewedJoinEmptyInput
> org.apache.pig.test.TestSkewedJoin.testSkewedJoinNullKeys
> org.apache.pig.test.TestSkewedJoin.testSkewedJoinOuter
> org.apache.pig.test.TestSkewedJoin.testRecursiveFileListing
> org.apache.pig.test.TestSkewedJoin.testSkewedJoinReducers
> org.apache.pig.test.TestJoinSmoke.testSkewedJoinWithGroup
> org.apache.pig.test.TestJoinSmoke.testSkewedJoinOuter



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to