liyunzhang_intel created PIG-4839: ------------------------------------- Summary: MultiQueryOptimizerSpark doesn't remove all redudant nodes in spark plan Key: PIG-4839 URL: https://issues.apache.org/jira/browse/PIG-4839 Project: Pig Issue Type: Sub-task Reporter: liyunzhang_intel Assignee: liyunzhang_intel
TestMultiQueryBasic#testMultiQueryWithFJ_2 {code} a = load './passwd' using PigStorage(':') as (uname:chararray,passwd:chararray, uid:int, gid:int); b = load './passwd' using PigStorage(':') as (uname:chararray,passwd:chararray, uid:int, gid:int); c = filter a by uid > 5; store c into './multiQueryFJ.output'; d = filter b by gid > 10; store d into './multiQueryFJ.output.2'; e = join c by gid, d by gid using 'repl'; store e into './multiQueryFJ.output.3'; {code} The spark plan: {code} before multiquery optimization: scope-57->scope-60 scope-66 scope-60 scope-61->scope-64 scope-68 scope-64 scope-66 scope-68->scope-66 #-------------------------------------------------- # Spark Plan #-------------------------------------------------- Spark node scope-61 Store(hdfs://zly1.sh.intel.com:8020/tmp/temp-1338833908/tmp-1814908586:org.apache.pig.impl.io.InterStorage) - scope-62 | |---d: Filter[bag] - scope-36 | | | Greater Than[boolean] - scope-39 | | | |---Project[int][3] - scope-37 | | | |---Constant(10) - scope-38 | |---b: New For Each(false,false,false,false)[bag] - scope-35 | | | Cast[chararray] - scope-24 | | | |---Project[bytearray][0] - scope-23 | | | Cast[chararray] - scope-27 | | | |---Project[bytearray][1] - scope-26 | | | Cast[int] - scope-30 | | | |---Project[bytearray][2] - scope-29 | | | Cast[int] - scope-33 | | | |---Project[bytearray][3] - scope-32 | |---b: Load(hdfs://zly1.sh.intel.com:8020/user/root/passwd:PigStorage(':')) - scope-22-------- Spark node scope-64 d: Store(hdfs://zly1.sh.intel.com:8020/user/root/multiQueryFJ.output.2:org.apache.pig.builtin.PigStorage) - scope-43 | |---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp-1338833908/tmp-1814908586:org.apache.pig.impl.io.InterStorage) - scope-63-------- Spark node scope-68 Store(hdfs://zly1.sh.intel.com:8020/tmp/temp-1338833908/tmp-1233897062:org.apache.pig.impl.io.InterStorage) - scope-69 | |---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp-1338833908/tmp-1814908586:org.apache.pig.impl.io.InterStorage) - scope-67-------- Spark node scope-66 e: Store(hdfs://zly1.sh.intel.com:8020/user/root/multiQueryFJ.output.3:org.apache.pig.builtin.PigStorage) - scope-56 | |---e: FRJoin[tuple] - scope-50 | | | Project[int][3] - scope-48 | | | Project[int][3] - scope-49 | |---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp-1338833908/tmp929915440:org.apache.pig.impl.io.InterStorage) - scope-65-------- Spark node scope-57 Store(hdfs://zly1.sh.intel.com:8020/tmp/temp-1338833908/tmp929915440:org.apache.pig.impl.io.InterStorage) - scope-58 | |---c: Filter[bag] - scope-14 | | | Greater Than[boolean] - scope-17 | | | |---Project[int][2] - scope-15 | | | |---Constant(5) - scope-16 | |---a: New For Each(false,false,false,false)[bag] - scope-13 | | | Cast[chararray] - scope-2 | | | |---Project[bytearray][0] - scope-1 | | | Cast[chararray] - scope-5 | | | |---Project[bytearray][1] - scope-4 | | | Cast[int] - scope-8 | | | |---Project[bytearray][2] - scope-7 | | | Cast[int] - scope-11 | | | |---Project[bytearray][3] - scope-10 | |---a: Load(hdfs://zly1.sh.intel.com:8020/user/root/passwd:PigStorage(':')) - scope-0-------- Spark node scope-60 c: Store(hdfs://zly1.sh.intel.com:8020/user/root/multiQueryFJ.output:org.apache.pig.builtin.PigStorage) - scope-21 | |---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp-1338833908/tmp929915440:org.apache.pig.impl.io.InterStorage) - scope-59-------- {code} After spark multiquery optimization, 6 spark nodes will be reduced to 4. scope-60 should be combined with scope-57 but not. {code} scope-57->scope-60 scope-66 scope-60 scope-61->scope-66 scope-66 #-------------------------------------------------- # Spark Plan #-------------------------------------------------- Spark node scope-61 Split - scope-70 | | | d: Store(hdfs://zly1.sh.intel.com:8020/user/root/multiQueryFJ.output.2:org.apache.pig.builtin.PigStorage) - scope-43 | | | Store(hdfs://zly1.sh.intel.com:8020/tmp/temp-1338833908/tmp-1233897062:org.apache.pig.impl.io.InterStorage) - scope-69 | |---d: Filter[bag] - scope-36 | | | Greater Than[boolean] - scope-39 | | | |---Project[int][3] - scope-37 | | | |---Constant(10) - scope-38 | |---b: New For Each(false,false,false,false)[bag] - scope-35 | | | Cast[chararray] - scope-24 | | | |---Project[bytearray][0] - scope-23 | | | Cast[chararray] - scope-27 | | | |---Project[bytearray][1] - scope-26 | | | Cast[int] - scope-30 | | | |---Project[bytearray][2] - scope-29 | | | Cast[int] - scope-33 | | | |---Project[bytearray][3] - scope-32 | |---b: Load(hdfs://zly1.sh.intel.com:8020/user/root/passwd:PigStorage(':')) - scope-22-------- Spark node scope-66 e: Store(hdfs://zly1.sh.intel.com:8020/user/root/multiQueryFJ.output.3:org.apache.pig.builtin.PigStorage) - scope-56 | |---e: FRJoin[tuple] - scope-50 | | | Project[int][3] - scope-48 | | | Project[int][3] - scope-49 | |---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp-1338833908/tmp929915440:org.apache.pig.impl.io.InterStorage) - scope-65-------- Spark node scope-57 Store(hdfs://zly1.sh.intel.com:8020/tmp/temp-1338833908/tmp929915440:org.apache.pig.impl.io.InterStorage) - scope-58 | |---c: Filter[bag] - scope-14 | | | Greater Than[boolean] - scope-17 | | | |---Project[int][2] - scope-15 | | | |---Constant(5) - scope-16 | |---a: New For Each(false,false,false,false)[bag] - scope-13 | | | Cast[chararray] - scope-2 | | | |---Project[bytearray][0] - scope-1 | | | Cast[chararray] - scope-5 | | | |---Project[bytearray][1] - scope-4 | | | Cast[int] - scope-8 | | | |---Project[bytearray][2] - scope-7 | | | Cast[int] - scope-11 | | | |---Project[bytearray][3] - scope-10 | |---a: Load(hdfs://zly1.sh.intel.com:8020/user/root/passwd:PigStorage(':')) - scope-0-------- Spark node scope-60 c: Store(hdfs://zly1.sh.intel.com:8020/user/root/multiQueryFJ.output:org.apache.pig.builtin.PigStorage) - scope-21 | |---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp-1338833908/tmp929915440:org.apache.pig.impl.io.InterStorage) - scope-59-------- {code} Following is mr plan after multiquery optimization {code} scope57->scope-66 scope-61->scope-66 scope-66 #-------------------------------------------------- # Map Reduce Plan #-------------------------------------------------- MapReduce node scope-61 Map Plan Split - scope-70 | | | d: Store(hdfs://zly1.sh.intel.com:8020/user/root/multiQueryFJ.output.2:org.apache.pig.builtin.PigStorage) - scope-43 | | | Store(hdfs://zly1.sh.intel.com:8020/tmp/temp411366696/tmp-223707761:org.apache.pig.impl.io.InterStorage) - scope-69 | |---d: Filter[bag] - scope-36 | | | Greater Than[boolean] - scope-39 | | | |---Project[int][3] - scope-37 | | | |---Constant(10) - scope-38 | |---b: New For Each(false,false,false,false)[bag] - scope-35 | | | Cast[chararray] - scope-24 | | | |---Project[bytearray][0] - scope-23 | | | Cast[chararray] - scope-27 | | | |---Project[bytearray][1] - scope-26 | | | Cast[int] - scope-30 | | | |---Project[bytearray][2] - scope-29 | | | Cast[int] - scope-33 | | | |---Project[bytearray][3] - scope-32 | |---b: Load(hdfs://zly1.sh.intel.com:8020/user/root/passwd:PigStorage(':')) - scope-22-------- Global sort: false ---------------- MapReduce node scope-66 Map Plan e: Store(hdfs://zly1.sh.intel.com:8020/user/root/multiQueryFJ.output.3:org.apache.pig.builtin.PigStorage) - scope-56 | |---e: FRJoin[tuple] - scope-50 | | | Project[int][3] - scope-48 | | | Project[int][3] - scope-49 | |---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp411366696/tmp-729323405:org.apache.pig.impl.io.InterStorage) - scope-65-------- Global sort: false ---------------- MapReduce node scope-57 Map Plan Split - scope-71 | | | c: Store(hdfs://zly1.sh.intel.com:8020/user/root/multiQueryFJ.output:org.apache.pig.builtin.PigStorage) - scope-21 | | | Store(hdfs://zly1.sh.intel.com:8020/tmp/temp411366696/tmp-729323405:org.apache.pig.impl.io.InterStorage) - scope-58 | |---c: Filter[bag] - scope-14 | | | Greater Than[boolean] - scope-17 | | | |---Project[int][2] - scope-15 | | | |---Constant(5) - scope-16 | |---a: New For Each(false,false,false,false)[bag] - scope-13 | | | Cast[chararray] - scope-2 | | | |---Project[bytearray][0] - scope-1 | | | Cast[chararray] - scope-5 | | | |---Project[bytearray][1] - scope-4 | | | Cast[int] - scope-8 | | | |---Project[bytearray][2] - scope-7 | | | Cast[int] - scope-11 | | | |---Project[bytearray][3] - scope-10 | |---a: Load(hdfs://zly1.sh.intel.com:8020/user/root/passwd:PigStorage(':')) - scope-0-------- Global sort: false ---------------- {code} After this jjra is fixed, [the modification|https://github.com/apache/pig/blob/spark/test/org/apache/pig/test/TestPigRunner.java#L458] in TestPigRunner can be removed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)