[ 
https://issues.apache.org/jira/browse/PIG-2652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13254388#comment-13254388
 ] 

Dmitriy V. Ryaboy commented on PIG-2652:
----------------------------------------

Ok, I have a test case. Estimation isn't triggered when the skewed join is 
preceded by another join (and perhaps anything else that has a reduce phase?).

Try this script:
{code}
-- lower this so that multiple reducers are forced
set pig.exec.reducers.bytes.per.reducer 118024;

x = load 'tmp/camac10/part*' as (foo:chararray);
y = load 'tmp/camac10/part*' as (foo:chararray);
x2 = load 'tmp/camac10/part*' as (bar:chararray);
x = join x by $0, x2 by $0;
z = join x by $0, y by $0;
store z into 'tmp/x11';
{code}

With both joins being regular hash joins, the stats look like this:
{code}
JobId   Maps    Reduces MaxMapTime      MinMapTIme      AvgMapTime      
MaxReduceTime   MinReduceTime   AvgReduceTime   Alias   Feature Outputs
job_201204041958_154682 2       12      6       6       6       19      19      
19      x2,x    HASH_JOIN       
job_201204041958_154690 2       9       6       6       6       19      19      
19      y,z     HASH_JOIN       hdfs://hadoop-nn/user/dmitriy/tmp/x11,
{code}

Note that *9 reducers* were used by the second join.

Now let's make the second join skewed (just add "using 'skewed'" to the second 
join statement).
New stats:
{code}
JobId   Maps    Reduces MaxMapTime      MinMapTIme      AvgMapTime      
MaxReduceTime   MinReduceTime   AvgReduceTime   Alias   Feature Outputs
job_201204041958_154662 2       12      6       6       6       26      19      
23      x2,x    HASH_JOIN       
job_201204041958_154664 1       1       6       6       6       19      19      
19              SAMPLER 
job_201204041958_154667 2       1       6       6       6       19      19      
19      y       SKEWED_JOIN     hdfs://hadoop-nn/user/dmitriy/tmp/x10,
{code}

(by the way -- I now notice that the z alias doesn't show up..).

Note a single reducer being used for the skewed join job.

Here's the plan:

{code}
#--------------------------------------------------
# Map Reduce Plan                                  
#--------------------------------------------------
MapReduce node scope-32
Map Plan
Union[tuple] - scope-33
|
|---x: Local Rearrange[tuple]{chararray}(false) - scope-14
|   |   |
|   |   Project[chararray][0] - scope-15
|   |
|   |---x: New For Each(false)[bag] - scope-4
|       |   |
|       |   Cast[chararray] - scope-2
|       |   |
|       |   |---Project[bytearray][0] - scope-1
|       |
|       |---x: 
Load(hdfs://hadoop-nn/user/dmitriy/tmp/camac10/part*:org.apache.pig.builtin.PigStorage)
 - scope-0
|
|---x: Local Rearrange[tuple]{chararray}(false) - scope-16
    |   |
    |   Project[chararray][0] - scope-17
    |
    |---x2: New For Each(false)[bag] - scope-9
        |   |
        |   Cast[chararray] - scope-7
        |   |
        |   |---Project[bytearray][0] - scope-6
        |
        |---x2: 
Load(hdfs://hadoop-dw-nn.smf1.twitter.com/user/dmitriy/tmp/camac10/part*:org.apache.pig.builtin.PigStorage)
 - scope-5--------
Reduce Plan
Store(hdfs://hadoop-nn/tmp/temp1728845767/tmp973408893:org.apache.pig.impl.io.InterStorage)
 - scope-35
|
|---POJoinPackage(true,true)[tuple] - scope-64--------
Global sort: false
----------------

MapReduce node scope-39
Map Plan
Local Rearrange[tuple]{tuple}(false) - scope-42
|   |
|   Constant(all) - scope-41
|
|---New For Each(true,true)[tuple] - scope-40
    |   |
    |   Project[chararray][0] - scope-26
    |   |
    |   POUserFunc(org.apache.pig.impl.builtin.GetMemNumRows)[tuple] - scope-37
    |   |
    |   |---Project[tuple][*] - scope-36
    |
    
|---Load(hdfs://hadoop-nn/tmp/temp1728845767/tmp973408893:org.apache.pig.impl.builtin.PoissonSampleLoader('org.apache.pig.impl.io.InterStorage','100'))
 - scope-38--------
Reduce Plan
Store(hdfs://hadoop-nn/tmp/temp1728845767/tmp-1828327704:org.apache.pig.impl.io.InterStorage)
 - scope-51
|
|---New For Each(false)[tuple] - scope-50
    |   |
    |   POUserFunc(org.apache.pig.impl.builtin.PartitionSkewedKeys)[tuple] - 
scope-49
    |   |
    |   |---Project[tuple][*] - scope-48
    |
    |---New For Each(false,false)[tuple] - scope-47
        |   |
        |   Constant(1) - scope-46
        |   |
        |   Project[bag][1] - scope-44
        |
        |---Package[tuple]{chararray} - scope-43--------
Global sort: false
Secondary sort: true
----------------

MapReduce node scope-57
Map Plan
Union[tuple] - scope-58
|
|---Local Rearrange[tuple]{chararray}(false) - scope-54
|   |   |
|   |   Project[chararray][0] - scope-26
|   |
|   
|---Load(hdfs://hadoop-nn/tmp/temp1728845767/tmp973408893:org.apache.pig.impl.io.InterStorage)
 - scope-52
|
|---Partition rearrange [bag]{chararray}(false) - scope-55
    |   |
    |   Project[chararray][0] - scope-27
    |
    |---y: New For Each(false)[bag] - scope-25
        |   |
        |   Cast[chararray] - scope-23
        |   |
        |   |---Project[bytearray][0] - scope-22
        |
        |---y: 
Load(hdfs://hadoop-nn/user/dmitriy/tmp/camac10/part*:org.apache.pig.builtin.PigStorage)
 - scope-21--------
Reduce Plan
z: 
Store(hdfs://hadoop-nn/user/dmitriy/tmp/x11:org.apache.pig.builtin.PigStorage) 
- scope-29
|
|---POJoinPackage(true,true)[tuple] - scope-66--------
Global sort: false
----------------
{code}

Daniel, what do you think about fixing this in SampleOptimizer vs making the 
multi-partition change I proposed (make it unnecessary to push the constant 
around, always generate stats for a large number of partitions and distributed 
them in WeightedPartitioner)?
                
> Skew join and order by don't trigger reducer estimation
> -------------------------------------------------------
>
>                 Key: PIG-2652
>                 URL: https://issues.apache.org/jira/browse/PIG-2652
>             Project: Pig
>          Issue Type: Bug
>            Reporter: Bill Graham
>            Assignee: Bill Graham
>             Fix For: 0.10.0, 0.9.3, 0.11
>
>         Attachments: PIG-2652_1.patch
>
>
> If neither PARALLEL, default parallel or {{mapred.reduce.tasks}} are set, the 
> number of reducers is not estimated based on input size for skew joins or 
> order by. Instead, these jobs get only 1 reducer.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to