[ 
https://issues.apache.org/jira/browse/PIG-4421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liyunzhang_intel updated PIG-4421:
----------------------------------
    Attachment: PIG-4421_2.patch

PIG-4421_2.patch resolved 
TestSkewedJoin#testSkewedJoinWithGroup,TestJoinSmoke#testSkewedJoinWithGroup 
which are not fixed in PIG-4421.patch.
After using PIG-4421_2.patch. Unit tests about SkewedJoin pass except 
TestSkewedJoin# testSkewedJoinKeyPartition.  This is because PIG-4421_2.patch 
uses regular join to replace SkewedJoin, this way can not solve all the unit 
tests of TestSkewedJoin. In PIG-4421_2.patch, changes are:
Not just add POSkewedJoin to sparkPlan but divide POSkewedJoin to 2 
POLocalRearranges, 1 POGlobalRearrange, 1 POPackage and 1 POForEach.
 For example:
TestSkewedJoin#testSkewedJoinWithGroup
the physical plan and spark plan will be
{code}
#-----------------------------------------------
# Physical Plan:
#-----------------------------------------------
E: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-13
|
|---E: SkewedJoin[tuple] - scope-12
    |   |
    |   Project[bytearray][0] - scope-10
    |   |
    |   Project[bytearray][0] - scope-11
    |
    |---C: Package(Packager)[tuple]{bytearray} - scope-2
    |   |
    |   |---C: Global Rearrange[tuple] - scope-1
    |       |
    |       |---C: Local Rearrange[tuple]{bytearray}(false) - scope-3
    |           |   |
    |           |   Project[bytearray][0] - scope-4
    |           |
    |           |---A: 
Load(/SkewedJoinInput1.txt:org.apache.pig.builtin.PigStorage) - scope-0
    |
    |---D: Package(Packager)[tuple]{bytearray} - scope-7
        |
        |---D: Global Rearrange[tuple] - scope-6
            |
            |---D: Local Rearrange[tuple]{bytearray}(false) - scope-8
                |   |
                |   Project[bytearray][0] - scope-9
                |
                |---B: 
Load(/SkewedJoinInput2.txt:org.apache.pig.builtin.PigStorage) - scope-5


#-----------------------------------------------------#
#The Spark node relations are:
#-----------------------------------------------------#
scope-14->scope-15 
scope-15->scope-21 
scope-16->scope-17 
scope-17->scope-21 
scope-21
#--------------------------------------------------
# Spark Plan                                  
#--------------------------------------------------

Spark node scope-14
C: Local Rearrange[tuple]{bytearray}(false) - scope-3
|   |
|   Project[bytearray][0] - scope-4
|
|---A: Load(/SkewedJoinInput1.txt:org.apache.pig.builtin.PigStorage) - 
scope-0--------

Spark node scope-15
Local Rearrange[tuple]{bytearray}(false) - scope-18
|   |
|   Project[bytearray][0] - scope-10
|
|---C: Package(Packager)[tuple]{bytearray} - scope-2
    |
    |---C: Global Rearrange[tuple] - scope-1--------

Spark node scope-21
E: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-13
|
|---New For Each(true,true)[tuple] - scope-25
    |   |
    |   Project[bag][1] - scope-23
    |   |
    |   Project[bag][2] - scope-24
    |
    |---Package(Packager)[tuple]{bytearray} - scope-22
        |
        |---Global Rearrange[tuple] - scope-20--------

Spark node scope-16
D: Local Rearrange[tuple]{bytearray}(false) - scope-8
|   |
|   Project[bytearray][0] - scope-9
|
|---B: Load(/SkewedJoinInput2.txt:org.apache.pig.builtin.PigStorage) - 
scope-5--------

Spark node scope-17
Local Rearrange[tuple]{bytearray}(false) - scope-19
|   |
|   Project[bytearray][0] - scope-11
|
|---D: Package(Packager)[tuple]{bytearray} - scope-7
    |
    |---D: Global Rearrange[tuple] - scope-6--------
 {code}
 
 If we use PIG-4211.patch, in this script, the sparkplan will be:
{code}
#-----------------------------------------------
# Physical Plan:
#-----------------------------------------------
E: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-13
|
|---E: SkewedJoin[tuple] - scope-12
    |   |
    |   Project[bytearray][0] - scope-10
    |   |
    |   Project[bytearray][0] - scope-11
    |
    |---C: Package(Packager)[tuple]{bytearray} - scope-2
    |   |
    |   |---C: Global Rearrange[tuple] - scope-1
    |       |
    |       |---C: Local Rearrange[tuple]{bytearray}(false) - scope-3
    |           |   |
    |           |   Project[bytearray][0] - scope-4
    |           |
    |           |---A: 
Load(/SkewedJoinInput1.txt:org.apache.pig.builtin.PigStorage) - scope-0
    |
    |---D: Package(Packager)[tuple]{bytearray} - scope-7
        |
        |---D: Global Rearrange[tuple] - scope-6
            |
            |---D: Local Rearrange[tuple]{bytearray}(false) - scope-8
                |   |
                |   Project[bytearray][0] - scope-9
                |
                |---B: 
Load(/SkewedJoinInput2.txt:org.apache.pig.builtin.PigStorage) - scope-5

#-----------------------------------------------------#
#The Spark node relations are:
#-----------------------------------------------------#
scope-14->scope-18 
scope-16->scope-18 
scope-18
#--------------------------------------------------
# Spark Plan                                  
#--------------------------------------------------

Spark node scope-14
C: Local Rearrange[tuple]{bytearray}(false) - scope-3
|   |
|   Project[bytearray][0] - scope-4
|
|---A: Load(/SkewedJoinInput1.txt:org.apache.pig.builtin.PigStorage) - 
scope-0--------

Spark node scope-18
E: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-13
|
|---E: SkewedJoin[tuple] - scope-12
    |   |
    |   Project[bytearray][0] - scope-10
    |   |
    |   Project[bytearray][0] - scope-11
    |
    |---C: Package(Packager)[tuple]{bytearray} - scope-2
    |   |
    |   |---C: Global Rearrange[tuple] - scope-1
    |
    |---D: Package(Packager)[tuple]{bytearray} - scope-7
        |
        |---D: Global Rearrange[tuple] - scope-6--------

Spark node scope-16
D: Local Rearrange[tuple]{bytearray}(false) - scope-8
|   |
|   Project[bytearray][0] - scope-9
|
|---B: Load(/SkewedJoinInput2.txt:org.apache.pig.builtin.PigStorage) - 
scope-5--------
{code}

 it will throw following exception:
 {code}
  java.lang.NullPointerException
         at 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager.getValueTuple(Packager.java:216)
         at 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage.getNextTuple(POPackage.java:315)
         at 
org.apache.pig.backend.hadoop.executionengine.spark.converter.PackageConverter$PackageFunction.apply(PackageConverter.java:128)
         at 
org.apache.pig.backend.hadoop.executionengine.spark.converter.PackageConverter$PackageFunction.apply(PackageConverter.java:65)
{code}
The exception is because there is 2 POPackages in one sparkOperator while 
org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkPOPackageAnnotator.PackageDiscoverer
 only deal with the only one POPackage in one sparkOperator.

> implement visitSkewedJoin in SparkCompiler
> ------------------------------------------
>
>                 Key: PIG-4421
>                 URL: https://issues.apache.org/jira/browse/PIG-4421
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: liyunzhang_intel
>            Assignee: liyunzhang_intel
>             Fix For: spark-branch
>
>         Attachments: PIG-4421.patch, PIG-4421_2.patch
>
>
> If visitSkewedJoin is not implemented, following unittests will fail.
> org.apache.pig.test.TestSkewedJoin.testSkewedJoinWithGroup
> org.apache.pig.test.TestSkewedJoin.testSkewedJoinMapKey
> org.apache.pig.test.TestSkewedJoin.testSkewedJoinManyReducers
> org.apache.pig.test.TestSkewedJoin.testNonExistingInputPathInSkewJoin
> org.apache.pig.test.TestSkewedJoin.testSkewedJoinOneValue
> org.apache.pig.test.TestSkewedJoin.testSkewedJoinWithNoProperties
> org.apache.pig.test.TestSkewedJoin.testSkewedJoinEmptyInput
> org.apache.pig.test.TestSkewedJoin.testSkewedJoinNullKeys
> org.apache.pig.test.TestSkewedJoin.testSkewedJoinOuter
> org.apache.pig.test.TestSkewedJoin.testRecursiveFileListing
> org.apache.pig.test.TestSkewedJoin.testSkewedJoinReducers
> org.apache.pig.test.TestJoinSmoke.testSkewedJoinWithGroup
> org.apache.pig.test.TestJoinSmoke.testSkewedJoinOuter



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to