[
https://issues.apache.org/jira/browse/PIG-2652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13254388#comment-13254388
]
Dmitriy V. Ryaboy commented on PIG-2652:
----------------------------------------
Ok, I have a test case. Estimation isn't triggered when the skewed join is
preceded by another join (and perhaps anything else that has a reduce phase?).
Try this script:
{code}
-- lower this so that multiple reducers are forced
set pig.exec.reducers.bytes.per.reducer 118024;
x = load 'tmp/camac10/part*' as (foo:chararray);
y = load 'tmp/camac10/part*' as (foo:chararray);
x2 = load 'tmp/camac10/part*' as (bar:chararray);
x = join x by $0, x2 by $0;
z = join x by $0, y by $0;
store z into 'tmp/x11';
{code}
With both joins being regular hash joins, the stats look like this:
{code}
JobId Maps Reduces MaxMapTime MinMapTIme AvgMapTime
MaxReduceTime MinReduceTime AvgReduceTime Alias Feature Outputs
job_201204041958_154682 2 12 6 6 6 19 19
19 x2,x HASH_JOIN
job_201204041958_154690 2 9 6 6 6 19 19
19 y,z HASH_JOIN hdfs://hadoop-nn/user/dmitriy/tmp/x11,
{code}
Note that *9 reducers* were used by the second join.
Now let's make the second join skewed (just add "using 'skewed'" to the second
join statement).
New stats:
{code}
JobId Maps Reduces MaxMapTime MinMapTIme AvgMapTime
MaxReduceTime MinReduceTime AvgReduceTime Alias Feature Outputs
job_201204041958_154662 2 12 6 6 6 26 19
23 x2,x HASH_JOIN
job_201204041958_154664 1 1 6 6 6 19 19
19 SAMPLER
job_201204041958_154667 2 1 6 6 6 19 19
19 y SKEWED_JOIN hdfs://hadoop-nn/user/dmitriy/tmp/x10,
{code}
(by the way -- I now notice that the z alias doesn't show up..).
Note a single reducer being used for the skewed join job.
Here's the plan:
{code}
#--------------------------------------------------
# Map Reduce Plan
#--------------------------------------------------
MapReduce node scope-32
Map Plan
Union[tuple] - scope-33
|
|---x: Local Rearrange[tuple]{chararray}(false) - scope-14
| | |
| | Project[chararray][0] - scope-15
| |
| |---x: New For Each(false)[bag] - scope-4
| | |
| | Cast[chararray] - scope-2
| | |
| | |---Project[bytearray][0] - scope-1
| |
| |---x:
Load(hdfs://hadoop-nn/user/dmitriy/tmp/camac10/part*:org.apache.pig.builtin.PigStorage)
- scope-0
|
|---x: Local Rearrange[tuple]{chararray}(false) - scope-16
| |
| Project[chararray][0] - scope-17
|
|---x2: New For Each(false)[bag] - scope-9
| |
| Cast[chararray] - scope-7
| |
| |---Project[bytearray][0] - scope-6
|
|---x2:
Load(hdfs://hadoop-dw-nn.smf1.twitter.com/user/dmitriy/tmp/camac10/part*:org.apache.pig.builtin.PigStorage)
- scope-5--------
Reduce Plan
Store(hdfs://hadoop-nn/tmp/temp1728845767/tmp973408893:org.apache.pig.impl.io.InterStorage)
- scope-35
|
|---POJoinPackage(true,true)[tuple] - scope-64--------
Global sort: false
----------------
MapReduce node scope-39
Map Plan
Local Rearrange[tuple]{tuple}(false) - scope-42
| |
| Constant(all) - scope-41
|
|---New For Each(true,true)[tuple] - scope-40
| |
| Project[chararray][0] - scope-26
| |
| POUserFunc(org.apache.pig.impl.builtin.GetMemNumRows)[tuple] - scope-37
| |
| |---Project[tuple][*] - scope-36
|
|---Load(hdfs://hadoop-nn/tmp/temp1728845767/tmp973408893:org.apache.pig.impl.builtin.PoissonSampleLoader('org.apache.pig.impl.io.InterStorage','100'))
- scope-38--------
Reduce Plan
Store(hdfs://hadoop-nn/tmp/temp1728845767/tmp-1828327704:org.apache.pig.impl.io.InterStorage)
- scope-51
|
|---New For Each(false)[tuple] - scope-50
| |
| POUserFunc(org.apache.pig.impl.builtin.PartitionSkewedKeys)[tuple] -
scope-49
| |
| |---Project[tuple][*] - scope-48
|
|---New For Each(false,false)[tuple] - scope-47
| |
| Constant(1) - scope-46
| |
| Project[bag][1] - scope-44
|
|---Package[tuple]{chararray} - scope-43--------
Global sort: false
Secondary sort: true
----------------
MapReduce node scope-57
Map Plan
Union[tuple] - scope-58
|
|---Local Rearrange[tuple]{chararray}(false) - scope-54
| | |
| | Project[chararray][0] - scope-26
| |
|
|---Load(hdfs://hadoop-nn/tmp/temp1728845767/tmp973408893:org.apache.pig.impl.io.InterStorage)
- scope-52
|
|---Partition rearrange [bag]{chararray}(false) - scope-55
| |
| Project[chararray][0] - scope-27
|
|---y: New For Each(false)[bag] - scope-25
| |
| Cast[chararray] - scope-23
| |
| |---Project[bytearray][0] - scope-22
|
|---y:
Load(hdfs://hadoop-nn/user/dmitriy/tmp/camac10/part*:org.apache.pig.builtin.PigStorage)
- scope-21--------
Reduce Plan
z:
Store(hdfs://hadoop-nn/user/dmitriy/tmp/x11:org.apache.pig.builtin.PigStorage)
- scope-29
|
|---POJoinPackage(true,true)[tuple] - scope-66--------
Global sort: false
----------------
{code}
Daniel, what do you think about fixing this in SampleOptimizer vs making the
multi-partition change I proposed (make it unnecessary to push the constant
around, always generate stats for a large number of partitions and distributed
them in WeightedPartitioner)?
> Skew join and order by don't trigger reducer estimation
> -------------------------------------------------------
>
> Key: PIG-2652
> URL: https://issues.apache.org/jira/browse/PIG-2652
> Project: Pig
> Issue Type: Bug
> Reporter: Bill Graham
> Assignee: Bill Graham
> Fix For: 0.10.0, 0.9.3, 0.11
>
> Attachments: PIG-2652_1.patch
>
>
> If neither PARALLEL, default parallel or {{mapred.reduce.tasks}} are set, the
> number of reducers is not estimated based on input size for skew joins or
> order by. Instead, these jobs get only 1 reducer.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators:
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira