[ https://issues.apache.org/jira/browse/PIG-2652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13254388#comment-13254388 ]
Dmitriy V. Ryaboy commented on PIG-2652: ---------------------------------------- Ok, I have a test case. Estimation isn't triggered when the skewed join is preceded by another join (and perhaps anything else that has a reduce phase?). Try this script: {code} -- lower this so that multiple reducers are forced set pig.exec.reducers.bytes.per.reducer 118024; x = load 'tmp/camac10/part*' as (foo:chararray); y = load 'tmp/camac10/part*' as (foo:chararray); x2 = load 'tmp/camac10/part*' as (bar:chararray); x = join x by $0, x2 by $0; z = join x by $0, y by $0; store z into 'tmp/x11'; {code} With both joins being regular hash joins, the stats look like this: {code} JobId Maps Reduces MaxMapTime MinMapTIme AvgMapTime MaxReduceTime MinReduceTime AvgReduceTime Alias Feature Outputs job_201204041958_154682 2 12 6 6 6 19 19 19 x2,x HASH_JOIN job_201204041958_154690 2 9 6 6 6 19 19 19 y,z HASH_JOIN hdfs://hadoop-nn/user/dmitriy/tmp/x11, {code} Note that *9 reducers* were used by the second join. Now let's make the second join skewed (just add "using 'skewed'" to the second join statement). New stats: {code} JobId Maps Reduces MaxMapTime MinMapTIme AvgMapTime MaxReduceTime MinReduceTime AvgReduceTime Alias Feature Outputs job_201204041958_154662 2 12 6 6 6 26 19 23 x2,x HASH_JOIN job_201204041958_154664 1 1 6 6 6 19 19 19 SAMPLER job_201204041958_154667 2 1 6 6 6 19 19 19 y SKEWED_JOIN hdfs://hadoop-nn/user/dmitriy/tmp/x10, {code} (by the way -- I now notice that the z alias doesn't show up..). Note a single reducer being used for the skewed join job. Here's the plan: {code} #-------------------------------------------------- # Map Reduce Plan #-------------------------------------------------- MapReduce node scope-32 Map Plan Union[tuple] - scope-33 | |---x: Local Rearrange[tuple]{chararray}(false) - scope-14 | | | | | Project[chararray][0] - scope-15 | | | |---x: New For Each(false)[bag] - scope-4 | | | | | Cast[chararray] - scope-2 | | | | | |---Project[bytearray][0] - scope-1 | | | |---x: Load(hdfs://hadoop-nn/user/dmitriy/tmp/camac10/part*:org.apache.pig.builtin.PigStorage) - scope-0 | |---x: Local Rearrange[tuple]{chararray}(false) - scope-16 | | | Project[chararray][0] - scope-17 | |---x2: New For Each(false)[bag] - scope-9 | | | Cast[chararray] - scope-7 | | | |---Project[bytearray][0] - scope-6 | |---x2: Load(hdfs://hadoop-dw-nn.smf1.twitter.com/user/dmitriy/tmp/camac10/part*:org.apache.pig.builtin.PigStorage) - scope-5-------- Reduce Plan Store(hdfs://hadoop-nn/tmp/temp1728845767/tmp973408893:org.apache.pig.impl.io.InterStorage) - scope-35 | |---POJoinPackage(true,true)[tuple] - scope-64-------- Global sort: false ---------------- MapReduce node scope-39 Map Plan Local Rearrange[tuple]{tuple}(false) - scope-42 | | | Constant(all) - scope-41 | |---New For Each(true,true)[tuple] - scope-40 | | | Project[chararray][0] - scope-26 | | | POUserFunc(org.apache.pig.impl.builtin.GetMemNumRows)[tuple] - scope-37 | | | |---Project[tuple][*] - scope-36 | |---Load(hdfs://hadoop-nn/tmp/temp1728845767/tmp973408893:org.apache.pig.impl.builtin.PoissonSampleLoader('org.apache.pig.impl.io.InterStorage','100')) - scope-38-------- Reduce Plan Store(hdfs://hadoop-nn/tmp/temp1728845767/tmp-1828327704:org.apache.pig.impl.io.InterStorage) - scope-51 | |---New For Each(false)[tuple] - scope-50 | | | POUserFunc(org.apache.pig.impl.builtin.PartitionSkewedKeys)[tuple] - scope-49 | | | |---Project[tuple][*] - scope-48 | |---New For Each(false,false)[tuple] - scope-47 | | | Constant(1) - scope-46 | | | Project[bag][1] - scope-44 | |---Package[tuple]{chararray} - scope-43-------- Global sort: false Secondary sort: true ---------------- MapReduce node scope-57 Map Plan Union[tuple] - scope-58 | |---Local Rearrange[tuple]{chararray}(false) - scope-54 | | | | | Project[chararray][0] - scope-26 | | | |---Load(hdfs://hadoop-nn/tmp/temp1728845767/tmp973408893:org.apache.pig.impl.io.InterStorage) - scope-52 | |---Partition rearrange [bag]{chararray}(false) - scope-55 | | | Project[chararray][0] - scope-27 | |---y: New For Each(false)[bag] - scope-25 | | | Cast[chararray] - scope-23 | | | |---Project[bytearray][0] - scope-22 | |---y: Load(hdfs://hadoop-nn/user/dmitriy/tmp/camac10/part*:org.apache.pig.builtin.PigStorage) - scope-21-------- Reduce Plan z: Store(hdfs://hadoop-nn/user/dmitriy/tmp/x11:org.apache.pig.builtin.PigStorage) - scope-29 | |---POJoinPackage(true,true)[tuple] - scope-66-------- Global sort: false ---------------- {code} Daniel, what do you think about fixing this in SampleOptimizer vs making the multi-partition change I proposed (make it unnecessary to push the constant around, always generate stats for a large number of partitions and distributed them in WeightedPartitioner)? > Skew join and order by don't trigger reducer estimation > ------------------------------------------------------- > > Key: PIG-2652 > URL: https://issues.apache.org/jira/browse/PIG-2652 > Project: Pig > Issue Type: Bug > Reporter: Bill Graham > Assignee: Bill Graham > Fix For: 0.10.0, 0.9.3, 0.11 > > Attachments: PIG-2652_1.patch > > > If neither PARALLEL, default parallel or {{mapred.reduce.tasks}} are set, the > number of reducers is not estimated based on input size for skew joins or > order by. Instead, these jobs get only 1 reducer. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira