[ https://issues.apache.org/jira/browse/PIG-4891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15839327#comment-15839327 ]
liyunzhang_intel commented on PIG-4891: --------------------------------------- Here is my understanding to this jira, let's use an example to explain it. {code} A = load './SkewedJoinInput1.txt' as (id,name,n); B = load './SkewedJoinInput2.txt' as (id,name); D = join A by (id,name), B by (id,name) using 'replicated'; explain D; {code} before the patch, the spark plan is: {code} #-------------------------------------------------- # Spark Plan #-------------------------------------------------- Spark node scope-26 Store(hdfs://zly1.sh.intel.com:8020/tmp/temp1749487848/tmp1731009936:org.apache.pig.impl.io.InterStorage) - scope-27 | |---B: New For Each(false,false)[bag] - scope-13 | | | Project[bytearray][0] - scope-9 | | | Project[bytearray][1] - scope-11 | |---B: Load(hdfs://zly1.sh.intel.com:8020/user/root/SkewedJoinInput2.txt:org.apache.pig.builtin.PigStorage) - scope-8-------- Spark node scope-25 D: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-24 | |---D: FRJoin[tuple] - scope-18 | | | Project[bytearray][0] - scope-14 | | | Project[bytearray][1] - scope-15 | | | Project[bytearray][0] - scope-16 | | | Project[bytearray][1] - scope-17 | |---A: New For Each(false,false,false)[bag] - scope-7 | | | Project[bytearray][0] - scope-1 | | | Project[bytearray][1] - scope-3 | | | Project[bytearray][2] - scope-5 | |---A: Load(hdfs://zly1.sh.intel.com:8020/user/root/SkewedJoinInput1.txt:org.apache.pig.builtin.PigStorage) - scope-0-------- {code} After patch {code} Spark node scope-28 D: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-24 | |---D: FRJoinSpark[tuple] - scope-18 | | | Project[bytearray][0] - scope-14 | | | Project[bytearray][1] - scope-15 | | | Project[bytearray][0] - scope-16 | | | Project[bytearray][1] - scope-17 | |---A: New For Each(false,false,false)[bag] - scope-7 | | | | | Project[bytearray][0] - scope-1 | | | | | Project[bytearray][1] - scope-3 | | | | | Project[bytearray][2] - scope-5 | | | |---A: Load(hdfs://zly1.sh.intel.com:8020/user/root/SkewedJoinInput1.txt:org.apache.pig.builtin.PigStorage) - scope-0 | |---BroadcastSpark - scope-27 | |---B: New For Each(false,false)[bag] - scope-13 | | | Project[bytearray][0] - scope-9 | | | Project[bytearray][1] - scope-11 | |---B: Load(hdfs://zly1.sh.intel.com:8020/user/root/SkewedJoinInput2.txt:org.apache.pig.builtin.PigStorage) - scope {code} In the patch 1. we don't load the small table to the distributed cache and start a new job to load data from distributed cache. 2. load small table as rdd and broadcast small rdd by SparkContext.broadcast() > Implement FR join by broadcasting small rdd not making more copys of data > ------------------------------------------------------------------------- > > Key: PIG-4891 > URL: https://issues.apache.org/jira/browse/PIG-4891 > Project: Pig > Issue Type: Sub-task > Components: spark > Reporter: liyunzhang_intel > Assignee: Nandor Kollar > Fix For: spark-branch > > > In current implementation of FRJoin(PIG-4771), we just set the value of > replication of data as 10 to make the data access more efficiency because > current FRJoin algrithms can be reused in this way. We need to figure out how > to use broadcasting small rdd to implement FRJoin in current code base if we > find the performance can be improved a lot by using broadcasting rdd. -- This message was sent by Atlassian JIRA (v6.3.4#6332)