[ https://issues.apache.org/jira/browse/PIG-4421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
liyunzhang_intel updated PIG-4421: ---------------------------------- Attachment: PIG-4421_5.patch the difference between PIG-4421_4.patch and PIG-4421_5.patch: fix the TestSkewedJoin.testSkewedJoinManyReducers unit test failure. *why TestSkewedJoin.testSkewedJoinManyReducers fails in PIG-4421_4.patch?* let me explain it with an example: {code} bin/testSkewedJoinManyReducers.pig: A = load './testSkewedJoin6.txt' as (id,name); B = load './testSkewedJoin7.txt' as (id,name); C = join A by id, B by id using 'skewed' parallel 300; store C into './testSkewedJoinManyReducers.out'; {code} {code} cat bin/testSkewedJoin6.txt: 237 0 237 1 237 2 237 3 237 4 {code} {code} cat bin/testSkewedJoin7.txt: 237 0 237 1 {code} The result of testSkewedJoinManyReducers.pig sometimes is: {code} 237 0 237 0 237 0 237 1 237 1 237 0 237 1 237 1 237 2 237 0 237 2 237 1 237 3 237 0 237 3 237 1 237 4 237 0 237 4 237 1 {code} Sometimes is: {code} 237 0 237 0 237 0 237 1 237 0 237 2 237 0 237 3 237 0 237 4 237 1 237 0 237 1 237 1 237 1 237 2 237 1 237 3 237 1 237 4 {code} *The first one is left join which is correct while the second one is right right which is not correct.* The reason causing this is because following code: {code} SparkLauncher.java#physicalToRDD private void physicalToRDD(PhysicalPlan plan, PhysicalOperator physicalOperator, Map<OperatorKey, RDD<Tuple>> rdds, List<RDD<Tuple>> rddsFromPredeSparkOper, Map<Class<? extends PhysicalOperator>, POConverter> convertMap) throws IOException { RDD<Tuple> nextRDD = null; List<PhysicalOperator> predecessors = plan .getPredecessors(physicalOperator); // the predecessors is not sorted by the OperatorKey, for example, physicalOperator(scope-9) and physicalOperator(scope-10) are both the predecessor of physicalOperator(scope-13). Sometimes the predecessors is [scope-9, scope-10], sometimes the predecessors is [scope-10, scope-9]. This will make the left join change to right join. ….. } {code} the solution to resolve this is : {code} SparkLauncher.java#physicalToRDD private void physicalToRDD(PhysicalPlan plan, PhysicalOperator physicalOperator, Map<OperatorKey, RDD<Tuple>> rdds, List<RDD<Tuple>> rddsFromPredeSparkOper, Map<Class<? extends PhysicalOperator>, POConverter> convertMap) throws IOException { RDD<Tuple> nextRDD = null; List<PhysicalOperator> predecessors = plan .getPredecessors(physicalOperator); + if( predecessors!= null ) { + Collections.sort(predecessors); +} ….. } {code} > implement visitSkewedJoin in SparkCompiler > ------------------------------------------ > > Key: PIG-4421 > URL: https://issues.apache.org/jira/browse/PIG-4421 > Project: Pig > Issue Type: Sub-task > Components: spark > Reporter: liyunzhang_intel > Assignee: liyunzhang_intel > Fix For: spark-branch > > Attachments: PIG-4421.patch, PIG-4421_2.patch, PIG-4421_3.patch, > PIG-4421_4.patch, PIG-4421_5.patch > > > If visitSkewedJoin is not implemented, following unittests will fail. > org.apache.pig.test.TestSkewedJoin.testSkewedJoinWithGroup > org.apache.pig.test.TestSkewedJoin.testSkewedJoinMapKey > org.apache.pig.test.TestSkewedJoin.testSkewedJoinManyReducers > org.apache.pig.test.TestSkewedJoin.testNonExistingInputPathInSkewJoin > org.apache.pig.test.TestSkewedJoin.testSkewedJoinOneValue > org.apache.pig.test.TestSkewedJoin.testSkewedJoinWithNoProperties > org.apache.pig.test.TestSkewedJoin.testSkewedJoinEmptyInput > org.apache.pig.test.TestSkewedJoin.testSkewedJoinNullKeys > org.apache.pig.test.TestSkewedJoin.testSkewedJoinOuter > org.apache.pig.test.TestSkewedJoin.testRecursiveFileListing > org.apache.pig.test.TestSkewedJoin.testSkewedJoinReducers > org.apache.pig.test.TestJoinSmoke.testSkewedJoinWithGroup > org.apache.pig.test.TestJoinSmoke.testSkewedJoinOuter -- This message was sent by Atlassian JIRA (v6.3.4#6332)