Re: a question about MRCompiler#visitSort

2015-03-31 Thread Rohini Palaniswamy
> My question here: WeightedRangePartitioner only shows how key distribute
and makes every reduce receive equal data from map. But this can gurantee
sort?
  Yes. It is a range partitioner and the ranges are determined after
sampling and determining key distribution to avoid the skew. For eg: In
very simple terms, f you have alphabetical keys a-z, it will distribute a-c
to reducer 0, d-m to reducer 1, m-z to reducer 2 so that if you read the
part files in order they are sorted.

On Tue, Mar 31, 2015 at 1:48 AM, Zhang, Liyun  wrote:

> Hi all,
>   I want to ask a question about following script:
> testlimit.pig
>
>
> a = load './testlimit.txt' as (x:int, y:chararray);
>
>
> b = order a by x;
>
>
> c = limit b 1;
>
>
> store c into './testlimit.out';
>
>
>
>
> In MR:it will generate 4 MapReduce node(scope-11, scope-14,
> scope-29,scope-40)
>
> scope-11: load the input data and store it to a tmp file
> scope-14: sampleload the tmp file and generate the quantile file: hdfs://
> zly1.sh.intel.com:8020/tmp/temp2146669591/tmp300898425. I think the
> quantile file contains
> the instance of WeightedRangePartitioner which shows how keys distribute.
> scope-29: use the quantile file to sort. My question here:
> WeightedRangePartitioner only shows how key distribute and makes every
> reduce receive equal data from map. But this can gurantee sort?
>
>
> #--
> # Map Reduce Plan
> #--
> MapReduce node scope-11
> Map Plan
> Store(hdfs://
> zly1.sh.intel.com:8020/tmp/temp2146669591/tmp694083214:org.apache.pig.impl.io.InterStorage)
> - scope-12
> |
> |---a: New For Each(false,false)[bag] - scope-7
>  |   |
>  |   Cast[int] - scope-2
>  |   |
>  |   |---Project[bytearray][0] - scope-1
>  |   |
>  |   Cast[chararray] - scope-5
>  |   |
>  |   |---Project[bytearray][1] - scope-4
>  |
>  |---a: Load(hdfs://
> zly1.sh.intel.com:8020/user/root/testlimit.txt:org.apache.pig.builtin.PigStorage)
> - scope-0
> Global sort: false
> 
>
> MapReduce node scope-14
> Map Plan
> b: Local Rearrange[tuple]{tuple}(false) - scope-18
> |   |
> |   Constant(all) - scope-17
> |
> |---New For Each(false)[tuple] - scope-16
>  |   |
>  |   Project[int][0] - scope-15
>  |
>  |---Load(hdfs://
> zly1.sh.intel.com:8020/tmp/temp2146669591/tmp694083214:org.apache.pig.impl.builtin.RandomSampleLoader('org.apache.pig.impl.io.InterStorage','100'))
> - scope-13
> Reduce Plan
> Store(hdfs://
> zly1.sh.intel.com:8020/tmp/temp2146669591/tmp300898425:org.apache.pig.impl.io.InterStorage)
> - scope-27
> |
> |---New For Each(false)[tuple] - scope-26
>  |   |
>  |   POUserFunc(org.apache.pig.impl.builtin.FindQuantiles)[tuple] -
> scope-25
>  |   |
>  |   |---Project[tuple][*] - scope-24
>  |
>  |---New For Each(false,false)[tuple] - scope-23
>  |   |
>  |   Constant(2) - scope-22
>  |   |
>  |   Project[bag][1] - scope-20
>  |
>  |---Package(Packager)[tuple]{chararray} - scope-19
> Global sort: false
> Secondary sort: true
> 
>
> MapReduce node scope-29
> Map Plan
> b: Local Rearrange[tuple]{int}(false) - scope-30
> |   |
> |   Project[int][0] - scope-8
> |
> |---Load(hdfs://
> zly1.sh.intel.com:8020/tmp/temp2146669591/tmp694083214:org.apache.pig.impl.io.InterStorage)
> - scope-28
> Combine Plan
> Local Rearrange[tuple]{int}(false) - scope-35
> |   |
> |   Project[int][0] - scope-8
> |
> |---Limit - scope-34
>  |
>  |---New For Each(true)[tuple] - scope-33
>  |   |
>  |   Project[bag][1] - scope-32
>  |
>  |---Package(LitePackager)[tuple]{int} - scope-31
> Reduce Plan
> c: Store(hdfs://
> zly1.sh.intel.com:8020/tmp/temp2146669591/tmp538566422:org.apache.pig.impl.io.InterStorage)
> - scope-10
> |
> |---Limit - scope-39
>  |
>  |---New For Each(true)[tuple] - scope-38
>  |   |
>  |   Project[bag][1] - scope-37
>  |
>  |---Package(LitePackager)[tuple]{int} - scope-36
> Global sort: true
> Quantile file: hdfs://
> zly1.sh.intel.com:8020/tmp/temp2146669591/tmp300898425
> 
>
> MapReduce node scope-40
> Map Plan
> b: Local Rearrange[tuple]{int}(false) - scope-42
> |   |
> |   Project[int][0] - scope-43
> |
> |---Load(hdfs://
> zly1.sh.intel.com:8020/tmp/temp2146669591/tmp538566422:org.apache.pig.impl.io.InterStorage)
> - scope-41
> Reduce Plan
> c: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-49
> |
> |---Limit - scope-48
>  |
>  |---New For Each(true)[bag] - scope-47
>  |   |
>  |   Project[tuple][1] - scope-46
>  |
>  |---Package(LitePackager)[tuple]{int} - scope-45
> Global sort: false
> 
>
>
>
>
> Kelly Zhang/Zhang,Liyun
> Best Regards
>
>


a question about MRCompiler#visitSort

2015-03-31 Thread Zhang, Liyun
Hi all,
  I want to ask a question about following script:
testlimit.pig


a = load './testlimit.txt' as (x:int, y:chararray);


b = order a by x;


c = limit b 1;


store c into './testlimit.out';




In MR:it will generate 4 MapReduce node(scope-11, scope-14, scope-29,scope-40)

scope-11: load the input data and store it to a tmp file
scope-14: sampleload the tmp file and generate the quantile file: 
hdfs://zly1.sh.intel.com:8020/tmp/temp2146669591/tmp300898425. I think the 
quantile file contains
the instance of WeightedRangePartitioner which shows how keys distribute.
scope-29: use the quantile file to sort. My question here: 
WeightedRangePartitioner only shows how key distribute and makes every reduce 
receive equal data from map. But this can gurantee sort?


#--
# Map Reduce Plan
#--
MapReduce node scope-11
Map Plan
Store(hdfs://zly1.sh.intel.com:8020/tmp/temp2146669591/tmp694083214:org.apache.pig.impl.io.InterStorage)
 - scope-12
|
|---a: New For Each(false,false)[bag] - scope-7
 |   |
 |   Cast[int] - scope-2
 |   |
 |   |---Project[bytearray][0] - scope-1
 |   |
 |   Cast[chararray] - scope-5
 |   |
 |   |---Project[bytearray][1] - scope-4
 |
 |---a: 
Load(hdfs://zly1.sh.intel.com:8020/user/root/testlimit.txt:org.apache.pig.builtin.PigStorage)
 - scope-0
Global sort: false


MapReduce node scope-14
Map Plan
b: Local Rearrange[tuple]{tuple}(false) - scope-18
|   |
|   Constant(all) - scope-17
|
|---New For Each(false)[tuple] - scope-16
 |   |
 |   Project[int][0] - scope-15
 |
 
|---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp2146669591/tmp694083214:org.apache.pig.impl.builtin.RandomSampleLoader('org.apache.pig.impl.io.InterStorage','100'))
 - scope-13
Reduce Plan
Store(hdfs://zly1.sh.intel.com:8020/tmp/temp2146669591/tmp300898425:org.apache.pig.impl.io.InterStorage)
 - scope-27
|
|---New For Each(false)[tuple] - scope-26
 |   |
 |   POUserFunc(org.apache.pig.impl.builtin.FindQuantiles)[tuple] - scope-25
 |   |
 |   |---Project[tuple][*] - scope-24
 |
 |---New For Each(false,false)[tuple] - scope-23
 |   |
 |   Constant(2) - scope-22
 |   |
 |   Project[bag][1] - scope-20
 |
 |---Package(Packager)[tuple]{chararray} - scope-19
Global sort: false
Secondary sort: true


MapReduce node scope-29
Map Plan
b: Local Rearrange[tuple]{int}(false) - scope-30
|   |
|   Project[int][0] - scope-8
|
|---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp2146669591/tmp694083214:org.apache.pig.impl.io.InterStorage)
 - scope-28
Combine Plan
Local Rearrange[tuple]{int}(false) - scope-35
|   |
|   Project[int][0] - scope-8
|
|---Limit - scope-34
 |
 |---New For Each(true)[tuple] - scope-33
 |   |
 |   Project[bag][1] - scope-32
 |
 |---Package(LitePackager)[tuple]{int} - scope-31
Reduce Plan
c: 
Store(hdfs://zly1.sh.intel.com:8020/tmp/temp2146669591/tmp538566422:org.apache.pig.impl.io.InterStorage)
 - scope-10
|
|---Limit - scope-39
 |
 |---New For Each(true)[tuple] - scope-38
 |   |
 |   Project[bag][1] - scope-37
 |
 |---Package(LitePackager)[tuple]{int} - scope-36
Global sort: true
Quantile file: hdfs://zly1.sh.intel.com:8020/tmp/temp2146669591/tmp300898425


MapReduce node scope-40
Map Plan
b: Local Rearrange[tuple]{int}(false) - scope-42
|   |
|   Project[int][0] - scope-43
|
|---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp2146669591/tmp538566422:org.apache.pig.impl.io.InterStorage)
 - scope-41
Reduce Plan
c: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-49
|
|---Limit - scope-48
 |
 |---New For Each(true)[bag] - scope-47
 |   |
 |   Project[tuple][1] - scope-46
 |
 |---Package(LitePackager)[tuple]{int} - scope-45
Global sort: false





Kelly Zhang/Zhang,Liyun
Best Regards



Re: a question about MRCompiler#visitSort

2015-03-05 Thread Mohit Sabharwal
Hi Kelly,

I haven't looked at the code in detail. But I assume it's using some
version of 'distributed sample sort' (
http://en.wikipedia.org/wiki/Samplesort) and purpose of sampling job is to
find distribution of keys, so that each bucket can be of similar size (i.e.
buckets will cover different size key ranges like a-b, c-m, n-z, etc but
have similar size of items), so that reducers get similar amount of work to
do.

thanks,
Mohit

On Wed, Mar 4, 2015 at 11:46 PM, Zhang, Liyun  wrote:

>  Hi all:
>
>   Now I am implementing PIG-4438
> (Can not work when in
> "limit after sort" situation in spark mode).
>
>
>
> testlimit.pig
>
> a = load './testlimit.txt' as (x:int, y:chararray);
>
> b = order a by x;
>
> c = limit b 1;
>
> store c into './testlimit.out';
>
> explain c;
>
>
>
> I read the code of MRCompiler#visitSort, can anyone tell me the function
> of org.apache.pig.impl.builtin.RandomSampleLoader,
> org.apache.pig.impl.builtin.FindQuantiles, why need get a sampling job when
> using POSort?
>
> I appreciate If someone can provide the design document of
>  MRCompiler#visitSort  implemention.
>
>
>
> following is mapreduce plan:
>
> #--
>
> # Map Reduce Plan
>
> #--
>
> MapReduce node scope-11
>
> Map Plan
>
> Store(hdfs://
> zly1.sh.intel.com:8020/tmp/temp2146669591/tmp694083214:org.apache.pig.impl.io.InterStorage)
> - scope-12
>
> |
>
> |---a: New For Each(false,false)[bag] - scope-7
>
>  |   |
>
>  |   Cast[int] - scope-2
>
>  |   |
>
>  |   |---Project[bytearray][0] - scope-1
>
>  |   |
>
>  |   Cast[chararray] - scope-5
>
>  |   |
>
>  |   |---Project[bytearray][1] - scope-4
>
>  |
>
>  |---a: Load(hdfs://
> zly1.sh.intel.com:8020/user/root/testlimit.txt:org.apache.pig.builtin.PigStorage)
> - scope-0
>
> Global sort: false
>
> 
>
>
>
> MapReduce node scope-14
>
> Map Plan
>
> b: Local Rearrange[tuple]{tuple}(false) - scope-18
>
> |   |
>
> |   Constant(all) - scope-17
>
> |
>
> |---New For Each(false)[tuple] - scope-16
>
>  |   |
>
>  |   Project[int][0] - scope-15
>
>  |
>
>  |---Load(hdfs://
> zly1.sh.intel.com:8020/tmp/temp2146669591/tmp694083214:org.apache.pig.impl.builtin.RandomSampleLoader('org.apache.pig.impl.io.InterStorage','100'))
> - scope-13
>
> Reduce Plan
>
> Store(hdfs://
> zly1.sh.intel.com:8020/tmp/temp2146669591/tmp300898425:org.apache.pig.impl.io.InterStorage)
> - scope-27
>
> |
>
> |---New For Each(false)[tuple] - scope-26
>
>  |   |
>
>  |
> POUserFunc(org.apache.pig.impl.builtin.FindQuantiles)[tuple] - scope-25
>
>  |   |
>
>  |   |---Project[tuple][*] - scope-24
>
>  |
>
>  |---New For Each(false,false)[tuple] - scope-23
>
>  |   |
>
>  |   Constant(2) - scope-22
>
>  |   |
>
>  |   Project[bag][1] - scope-20
>
>  |
>
>  |---Package(Packager)[tuple]{chararray} -
> scope-19
>
> Global sort: false
>
> Secondary sort: true
>
> 
>
>
>
> MapReduce node scope-29
>
> Map Plan
>
> b: Local Rearrange[tuple]{int}(false) - scope-30
>
> |   |
>
> |   Project[int][0] - scope-8
>
> |
>
>|---Load(hdfs://
> zly1.sh.intel.com:8020/tmp/temp2146669591/tmp694083214:org.apache.pig.impl.io.InterStorage)
> - scope-28
>
> Combine Plan
>
> Local Rearrange[tuple]{int}(false) - scope-35
>
> |   |
>
> |   Project[int][0] - scope-8
>
> |
>
> |---Limit - scope-34
>
>  |
>
>  |---New For Each(true)[tuple] - scope-33
>
>  |   |
>
>  |   Project[bag][1] - scope-32
>
>  |
>
>  |---Package(LitePackager)[tuple]{int} -
> scope-31
>
> Reduce Plan
>
> c: Store(

a question about MRCompiler#visitSort

2015-03-04 Thread Zhang, Liyun
Hi all:
  Now I am implementing 
PIG-4438(Can not work when in 
"limit after sort" situation in spark mode).

testlimit.pig
a = load './testlimit.txt' as (x:int, y:chararray);
b = order a by x;
c = limit b 1;
store c into './testlimit.out';
explain c;

I read the code of MRCompiler#visitSort, can anyone tell me the function of 
org.apache.pig.impl.builtin.RandomSampleLoader, 
org.apache.pig.impl.builtin.FindQuantiles, why need get a sampling job when 
using POSort?
I appreciate If someone can provide the design document of  
MRCompiler#visitSort  implemention.

following is mapreduce plan:
#--
# Map Reduce Plan
#--
MapReduce node scope-11
Map Plan

Store(hdfs://zly1.sh.intel.com:8020/tmp/temp2146669591/tmp694083214:org.apache.pig.impl.io.InterStorage)
 - scope-12
|
|---a: New For Each(false,false)[bag] - scope-7
 |   |
 |   Cast[int] - scope-2
 |   |
 |   |---Project[bytearray][0] - scope-1
 |   |
 |   Cast[chararray] - scope-5
 |   |
 |   |---Project[bytearray][1] - scope-4
 |
 |---a: 
Load(hdfs://zly1.sh.intel.com:8020/user/root/testlimit.txt:org.apache.pig.builtin.PigStorage)
 - scope-0
Global sort: false


MapReduce node scope-14
Map Plan
b: Local Rearrange[tuple]{tuple}(false) - scope-18
|   |
|   Constant(all) - scope-17
|
|---New For Each(false)[tuple] - scope-16
 |   |
 |   Project[int][0] - scope-15
 |
 
|---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp2146669591/tmp694083214:org.apache.pig.impl.builtin.RandomSampleLoader('org.apache.pig.impl.io.InterStorage','100'))
 - scope-13
Reduce Plan

Store(hdfs://zly1.sh.intel.com:8020/tmp/temp2146669591/tmp300898425:org.apache.pig.impl.io.InterStorage)
 - scope-27
|
|---New For Each(false)[tuple] - scope-26
 |   |
 |   
POUserFunc(org.apache.pig.impl.builtin.FindQuantiles)[tuple] - scope-25
 |   |
 |   |---Project[tuple][*] - scope-24
 |
 |---New For Each(false,false)[tuple] - scope-23
 |   |
 |   Constant(2) - scope-22
 |   |
 |   Project[bag][1] - scope-20
 |
 |---Package(Packager)[tuple]{chararray} - 
scope-19
Global sort: false
Secondary sort: true


MapReduce node scope-29
Map Plan
b: Local Rearrange[tuple]{int}(false) - scope-30
|   |
|   Project[int][0] - scope-8
|
   
|---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp2146669591/tmp694083214:org.apache.pig.impl.io.InterStorage)
 - scope-28
Combine Plan
Local Rearrange[tuple]{int}(false) - scope-35
|   |
|   Project[int][0] - scope-8
|
|---Limit - scope-34
 |
 |---New For Each(true)[tuple] - scope-33
 |   |
 |   Project[bag][1] - scope-32
 |
 |---Package(LitePackager)[tuple]{int} - 
scope-31
Reduce Plan
c: 
Store(hdfs://zly1.sh.intel.com:8020/tmp/temp2146669591/tmp538566422:org.apache.pig.impl.io.InterStorage)
 - scope-10
|
|---Limit - scope-39
 |
 |---New For Each(true)[tuple] - scope-38
 |   |
 |   Project[bag][1] - scope-37
 |
 |---Package(LitePackager)[tuple]{int} - 
scope-36
Global sort: true
Quantile file: 
hdfs://zly1.sh.intel.com:8020/tmp/temp2146669591/tmp300898425


MapReduce node scope-40
Map Plan
b: Local Rearrange[tuple]{int}(false) - scope-42
|   |
|   Project[int][0] - scope-43
|
   
|---Load(hdfs:/