[ 
https://issues.apache.org/jira/browse/PIG-4771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liyunzhang_intel updated PIG-4771:
----------------------------------
    Attachment: PIG-4771.patch

Use the algorithms in POFRJoin to implement FRJoin for pig on spark mode.
let use an example to explain this feature
frJoin.pig
{code}
A = load './SkewedJoinInput1.txt' as (id,name,n);
B = load './SkewedJoinInput2.txt' as (id,name);
C = filter B by id > 100;
D = join A by (id,name), C by (id,name) using 'replicated';
store D into './testFRJoin.out';
{code}
Physical Plan
{code}
#-----------------------------------------------
# Physical Plan:
#-----------------------------------------------
D: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-17
|
|---D: FRJoin[tuple] - scope-11
    |   |
    |   Project[bytearray][0] - scope-7
    |   |
    |   Project[bytearray][1] - scope-8
    |   |
    |   Project[bytearray][0] - scope-9
    |   |
    |   Project[bytearray][1] - scope-10
    |
    |---A: 
Load(hdfs://zly1.sh.intel.com:8020/user/root/SkewedJoinInput1.txt:org.apache.pig.builtin.PigStorage)
 - scope-0
    |
    |---C: Filter[bag] - scope-2
        |   |
        |   Greater Than[boolean] - scope-6
        |   |
        |   |---Cast[int] - scope-4
        |   |   |
        |   |   |---Project[bytearray][0] - scope-3
        |   |
        |   |---Constant(100) - scope-5
        |
        |---B: 
Load(hdfs://zly1.sh.intel.com:8020/user/root/SkewedJoinInput2.txt:org.apache.pig.builtin.PigStorage)
 - scope-1
{code}

Spark plan
{code}
Spark node scope-40
Store(hdfs://zly1.sh.intel.com:8020/tmp/temp-862810646/tmp-1458847763:org.apache.pig.impl.io.InterStorage)
 - scope-41
|
|---C: Filter[bag] - scope-23
    |   |
    |   Greater Than[boolean] - scope-27
    |   |
    |   |---Cast[int] - scope-25
    |   |   |
    |   |   |---Project[bytearray][0] - scope-24
    |   |
    |   |---Constant(100) - scope-26
    |
    |---B: 
Load(hdfs://zly1.sh.intel.com:8020/user/root/SkewedJoinInput2.txt:org.apache.pig.builtin.PigStorage)
 - scope-22--------

Spark node scope-39
D: 
Store(hdfs://zly1.sh.intel.com:8020/user/root/testFRJoin.out:org.apache.pig.builtin.PigStorage)
 - scope-38
|
|---D: FRJoin[tuple] - scope-32
    |   |
    |   Project[bytearray][0] - scope-28
    |   |
    |   Project[bytearray][1] - scope-29
    |   |
    |   Project[bytearray][0] - scope-30
    |   |
    |   Project[bytearray][1] - scope-31
    |
    |---A: 
Load(hdfs://zly1.sh.intel.com:8020/user/root/SkewedJoinInput1.txt:org.apache.pig.builtin.PigStorage)
 - scope-21--------
{code}

in SparkCompiler#visitFRJoin:We create a sparkOperator to save the result of 
replicated file to the hdfs temporary file. We load the temporary file in 
POFRJoin#setUpHashMap.*Why create a new SparkOperator just load file then store 
it to a temporary file and then load it in POFRJoin#setUpHash?why not just load 
the file in POFRJOin#setUpHash?* This is because we can not gurantee that the 
type of predecessors of FRJoin is POLoad in physical plan, in above case, the 
predecessors of FRJoin is POFIlter and POLoad.

*How to gurantee that replicated files are access to the spark workers?*
Replicated files are stored in hdfs and spark workers can access them. We set 
mapred.submit.replication as "10" to make more backups of replicated files so 
that  spark workers are likely to access the data locally. We don't use 
Distributed Cache( a map-reduce feature) like what is used in MR mode because 
we do not gurantee users install MR when they use pig on spark.


> Implement FR Join for spark engine
> ----------------------------------
>
>                 Key: PIG-4771
>                 URL: https://issues.apache.org/jira/browse/PIG-4771
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: liyunzhang_intel
>            Assignee: liyunzhang_intel
>             Fix For: spark-branch
>
>         Attachments: PIG-4771.patch
>
>
> We use regular join to replace FR join in current code base(fd31fda). We need 
> to implement FR join.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to