[ 
https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15515605#comment-15515605
 ] 

liyunzhang_intel edited comment on PIG-5029 at 9/23/16 6:54 AM:
----------------------------------------------------------------

[~kexianda], [~mohitsabharwal] , [~pallavi.rao] and [~xuefuz]:

Current I encounter the problem when I use RDD.sortByKey() when the data is 
skewed.  Although [RangePartitioner 
|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106]
 is used in RDD.sortByKey.
Give an example to explain the problem.
{code}
  val rdd = sc.parallelize(1.to(2000)).map(x=>(x/2000,x))
val numPartitions = 2
val partitioner = new RangePartition(numPartition,rdd)
val numsOfPartitions = rdd.keys.map(k=>partitioner.getPartition(
numsOfPartition.map(pair=>println("key:"+pair._1+ "val:"+pair._2))

{code}

{code}
The Result:
Key:0   val:1999
Key:1   val:1
{code}

As the code will generate data like [(0,1),(0,2),……(0,1999),(1,2000)],  we use 
RangePartitioner to partition the data.  Although it is said that 
It will partitions sortable records by range into roughly. But it seems that it 
will not distribute skewed keys into different partitions evenly.


After reviewing the code of RangePartitioner, I found that although 
RangePartitioner first sample, then order the sample data, then define the 
range for each partition to guarantee that keys to the correct partition(here 
correct means that elements in (partition-1) partition is all smaller than 
specified key and elements in (partition+1) partitions are bigger than 
specified key). But RangePartitioner cannot guarantee that
Keys are distributed evenly.

After reviewing the pig code 
[WeightedRangePartitioner|https://github.com/apache/pig/blob/40300960d2edaa8097e551dd8692e29d6018ffc4/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java],
 I found that this WeightedRangePartitioner could handle this kind of case. The 
difference between WeightedRangePartitioner and RangePartitioner is that 
WeightedRangePartitioner also consider the weight of each element, if an 
element occupies a lot in the whole data.  It will occupies some partitions( 
for example 7 of total 40 partition) not only 1 partition.

In order to solve this case, I have tried following solution( append a random 
to the key to make the key distributed evenly) and remove the random in the 
result like:
{code}
val random = new java.util.Random()
val rdd = sc.parallelize(1.to(2000)).map(x=>((x,random.nextInt()),x))
val numPartitions = 2
val partitioner = new org.apache.spark.RangePartitioner(numPartitions,rdd)
val numsOfPartitions = 
rdd.keys.map(k=>partitioner.getPartition(k)).countByValue()
numsOfPartition.map(pair=>println("key:"+pair._1+ "val:"+pair._2))
val newRdd = rdd.map( p => (p._1._1, p._2))
{code}
{code}
The Result:
Key:0   val:999
Key:1   val:1001
{code}
It shows that 2000 elements are distributed evenly in 2 partition after this 
solution. But we *cannot* use this solution as some guy from 
community suggest not because this solution will cause data *redundant* and 
*missing* if spark job retires after failure.

My question is:
Is there any better way to solve skewed key sort case in spark? [~vanzin], as 
you are specialist in spark , can you help view this issue?




was (Author: kellyzly):
[~kexianda], [~mohitsabharwal] , [~pallavi.rao] and [~xuefuz]:

Current I encounter the problem when I use RDD.sortByKey() when the data is 
skewed.  Although RangePartitioner is used in RDD.sortByKey.
Give an example to explain the problem.
{code}
  val rdd = sc.parallelize(1.to(2000)).map(x=>(x/2000,x))
val numPartitions = 2
val partitioner = new RangePartition(numPartition,rdd)
val numsOfPartitions = rdd.keys.map(k=>partitioner.getPartition(
numsOfPartition.map(pair=>println("key:"+pair._1+ "val:"+pair._2))

{code}

{code}
The Result:
Key:0   val:1999
Key:1   val:1
{code}

As the code will generate data like [(0,1),(0,2),……(0,1999),(1,2000)],  we use 
RangePartitioner to partition the data.  Although it is said that 
It will partitions sortable records by range into roughly. But it seems that it 
will not distribute skewed keys into different partitions evenly.


After reviewing the code of RangePartitioner, I found that although 
RangePartitioner first sample, then order the sample data, then define the 
range for each partition to guarantee that keys to the correct partition(here 
correct means that elements in (partition-1) partition is all smaller than 
specified key and elements in (partition+1) partitions are bigger than 
specified key). But RangePartitioner cannot guarantee that
Keys are distributed evenly.

After reviewing the pig code WeightedRangePartitioner, I found that this 
WeightedRangePartitioner could handle this kind of case. The difference between 
WeightedRangePartitioner and RangePartitioner is that WeightedRangePartitioner 
also consider the weight of each element, if an element occupies a lot in the 
whole data.  It will occupies some partitions( for example 7 of total 40 
partition) not only 1 partition.

In order to solve this case, I have tried following solution( append a random 
to the key to make the key distributed evenly) and remove the random in the 
result like:
{code}
val random = new java.util.Random()
val rdd = sc.parallelize(1.to(2000)).map(x=>((x,random.nextInt()),x))
val numPartitions = 2
val partitioner = new org.apache.spark.RangePartitioner(numPartitions,rdd)
val numsOfPartitions = 
rdd.keys.map(k=>partitioner.getPartition(k)).countByValue()
numsOfPartition.map(pair=>println("key:"+pair._1+ "val:"+pair._2))
val newRdd = rdd.map( p => (p._1._1, p._2))
{code}
{code}
The Result:
Key:0   val:999
Key:1   val:1001
{code}
It shows that 2000 elements are distributed evenly in 2 partition after this 
solution. But we *cannot* use this solution as some guy from 
Community suggest not because this kind of solution will cause data *redundant* 
and *missing* if spark job retires after failure.

My question is:
Is there any better way to solve skewed key sort case in spark?



> Optimize sort case when data is skewed
> --------------------------------------
>
>                 Key: PIG-5029
>                 URL: https://issues.apache.org/jira/browse/PIG-5029
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: liyunzhang_intel
>            Assignee: liyunzhang_intel
>             Fix For: spark-branch
>
>         Attachments: PIG-5029.patch, SkewedData_L9.docx
>
>
> In PigMix L9.pig
> {code}
> register $PIGMIX_JAR
> A = load '$HDFS_ROOT/page_views' using 
> org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
>     as (user, action, timespent, query_term, ip_addr, timestamp,
>         estimated_revenue, page_info, page_links);
> B = order A by query_term parallel $PARALLEL;
> store B into '$PIGMIX_OUTPUT/L9out';
> {code}
> The pig physical plan will be changed to spark plan and to spark lineage:
> {code}
> [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter 
> (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at 
> map at StoreConverter.java:80 []
>  |   MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 []
>  |   ShuffledRDD[6] at sortByKey at SortConverter.java:56 []
>  +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 []
>     |   MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 []
>     |   MapPartitionsRDD[1] at map at LoadConverter.java:127 []
>     |   NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 []
> {code}
> We use 
> [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56]
>  to implement the sort feature. Although 
> [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106]
>  is used by RDD.sortByKey and RangePartitiner will sample data and ranges the 
> key roughly into equal range, the test result(attached  document) shows that 
> one partition will load most keys and take long time to finish.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to