[
https://issues.apache.org/jira/browse/PIG-4421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
liyunzhang_intel updated PIG-4421:
----------------------------------
Attachment: PIG-4421_2.patch
PIG-4421_2.patch resolved
TestSkewedJoin#testSkewedJoinWithGroup,TestJoinSmoke#testSkewedJoinWithGroup
which are not fixed in PIG-4421.patch.
After using PIG-4421_2.patch. Unit tests about SkewedJoin pass except
TestSkewedJoin# testSkewedJoinKeyPartition. This is because PIG-4421_2.patch
uses regular join to replace SkewedJoin, this way can not solve all the unit
tests of TestSkewedJoin. In PIG-4421_2.patch, changes are:
Not just add POSkewedJoin to sparkPlan but divide POSkewedJoin to 2
POLocalRearranges, 1 POGlobalRearrange, 1 POPackage and 1 POForEach.
For example:
TestSkewedJoin#testSkewedJoinWithGroup
the physical plan and spark plan will be
{code}
#-----------------------------------------------
# Physical Plan:
#-----------------------------------------------
E: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-13
|
|---E: SkewedJoin[tuple] - scope-12
| |
| Project[bytearray][0] - scope-10
| |
| Project[bytearray][0] - scope-11
|
|---C: Package(Packager)[tuple]{bytearray} - scope-2
| |
| |---C: Global Rearrange[tuple] - scope-1
| |
| |---C: Local Rearrange[tuple]{bytearray}(false) - scope-3
| | |
| | Project[bytearray][0] - scope-4
| |
| |---A:
Load(/SkewedJoinInput1.txt:org.apache.pig.builtin.PigStorage) - scope-0
|
|---D: Package(Packager)[tuple]{bytearray} - scope-7
|
|---D: Global Rearrange[tuple] - scope-6
|
|---D: Local Rearrange[tuple]{bytearray}(false) - scope-8
| |
| Project[bytearray][0] - scope-9
|
|---B:
Load(/SkewedJoinInput2.txt:org.apache.pig.builtin.PigStorage) - scope-5
#-----------------------------------------------------#
#The Spark node relations are:
#-----------------------------------------------------#
scope-14->scope-15
scope-15->scope-21
scope-16->scope-17
scope-17->scope-21
scope-21
#--------------------------------------------------
# Spark Plan
#--------------------------------------------------
Spark node scope-14
C: Local Rearrange[tuple]{bytearray}(false) - scope-3
| |
| Project[bytearray][0] - scope-4
|
|---A: Load(/SkewedJoinInput1.txt:org.apache.pig.builtin.PigStorage) -
scope-0--------
Spark node scope-15
Local Rearrange[tuple]{bytearray}(false) - scope-18
| |
| Project[bytearray][0] - scope-10
|
|---C: Package(Packager)[tuple]{bytearray} - scope-2
|
|---C: Global Rearrange[tuple] - scope-1--------
Spark node scope-21
E: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-13
|
|---New For Each(true,true)[tuple] - scope-25
| |
| Project[bag][1] - scope-23
| |
| Project[bag][2] - scope-24
|
|---Package(Packager)[tuple]{bytearray} - scope-22
|
|---Global Rearrange[tuple] - scope-20--------
Spark node scope-16
D: Local Rearrange[tuple]{bytearray}(false) - scope-8
| |
| Project[bytearray][0] - scope-9
|
|---B: Load(/SkewedJoinInput2.txt:org.apache.pig.builtin.PigStorage) -
scope-5--------
Spark node scope-17
Local Rearrange[tuple]{bytearray}(false) - scope-19
| |
| Project[bytearray][0] - scope-11
|
|---D: Package(Packager)[tuple]{bytearray} - scope-7
|
|---D: Global Rearrange[tuple] - scope-6--------
{code}
If we use PIG-4211.patch, in this script, the sparkplan will be:
{code}
#-----------------------------------------------
# Physical Plan:
#-----------------------------------------------
E: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-13
|
|---E: SkewedJoin[tuple] - scope-12
| |
| Project[bytearray][0] - scope-10
| |
| Project[bytearray][0] - scope-11
|
|---C: Package(Packager)[tuple]{bytearray} - scope-2
| |
| |---C: Global Rearrange[tuple] - scope-1
| |
| |---C: Local Rearrange[tuple]{bytearray}(false) - scope-3
| | |
| | Project[bytearray][0] - scope-4
| |
| |---A:
Load(/SkewedJoinInput1.txt:org.apache.pig.builtin.PigStorage) - scope-0
|
|---D: Package(Packager)[tuple]{bytearray} - scope-7
|
|---D: Global Rearrange[tuple] - scope-6
|
|---D: Local Rearrange[tuple]{bytearray}(false) - scope-8
| |
| Project[bytearray][0] - scope-9
|
|---B:
Load(/SkewedJoinInput2.txt:org.apache.pig.builtin.PigStorage) - scope-5
#-----------------------------------------------------#
#The Spark node relations are:
#-----------------------------------------------------#
scope-14->scope-18
scope-16->scope-18
scope-18
#--------------------------------------------------
# Spark Plan
#--------------------------------------------------
Spark node scope-14
C: Local Rearrange[tuple]{bytearray}(false) - scope-3
| |
| Project[bytearray][0] - scope-4
|
|---A: Load(/SkewedJoinInput1.txt:org.apache.pig.builtin.PigStorage) -
scope-0--------
Spark node scope-18
E: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-13
|
|---E: SkewedJoin[tuple] - scope-12
| |
| Project[bytearray][0] - scope-10
| |
| Project[bytearray][0] - scope-11
|
|---C: Package(Packager)[tuple]{bytearray} - scope-2
| |
| |---C: Global Rearrange[tuple] - scope-1
|
|---D: Package(Packager)[tuple]{bytearray} - scope-7
|
|---D: Global Rearrange[tuple] - scope-6--------
Spark node scope-16
D: Local Rearrange[tuple]{bytearray}(false) - scope-8
| |
| Project[bytearray][0] - scope-9
|
|---B: Load(/SkewedJoinInput2.txt:org.apache.pig.builtin.PigStorage) -
scope-5--------
{code}
it will throw following exception:
{code}
java.lang.NullPointerException
at
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager.getValueTuple(Packager.java:216)
at
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage.getNextTuple(POPackage.java:315)
at
org.apache.pig.backend.hadoop.executionengine.spark.converter.PackageConverter$PackageFunction.apply(PackageConverter.java:128)
at
org.apache.pig.backend.hadoop.executionengine.spark.converter.PackageConverter$PackageFunction.apply(PackageConverter.java:65)
{code}
The exception is because there is 2 POPackages in one sparkOperator while
org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkPOPackageAnnotator.PackageDiscoverer
only deal with the only one POPackage in one sparkOperator.
> implement visitSkewedJoin in SparkCompiler
> ------------------------------------------
>
> Key: PIG-4421
> URL: https://issues.apache.org/jira/browse/PIG-4421
> Project: Pig
> Issue Type: Sub-task
> Components: spark
> Reporter: liyunzhang_intel
> Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-4421.patch, PIG-4421_2.patch
>
>
> If visitSkewedJoin is not implemented, following unittests will fail.
> org.apache.pig.test.TestSkewedJoin.testSkewedJoinWithGroup
> org.apache.pig.test.TestSkewedJoin.testSkewedJoinMapKey
> org.apache.pig.test.TestSkewedJoin.testSkewedJoinManyReducers
> org.apache.pig.test.TestSkewedJoin.testNonExistingInputPathInSkewJoin
> org.apache.pig.test.TestSkewedJoin.testSkewedJoinOneValue
> org.apache.pig.test.TestSkewedJoin.testSkewedJoinWithNoProperties
> org.apache.pig.test.TestSkewedJoin.testSkewedJoinEmptyInput
> org.apache.pig.test.TestSkewedJoin.testSkewedJoinNullKeys
> org.apache.pig.test.TestSkewedJoin.testSkewedJoinOuter
> org.apache.pig.test.TestSkewedJoin.testRecursiveFileListing
> org.apache.pig.test.TestSkewedJoin.testSkewedJoinReducers
> org.apache.pig.test.TestJoinSmoke.testSkewedJoinWithGroup
> org.apache.pig.test.TestJoinSmoke.testSkewedJoinOuter
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)