[
https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15624556#comment-15624556
]
liyunzhang_intel commented on PIG-5029:
---------------------------------------
[~knoguchi]:
Before we discussed a lot about the skewed key sort issue in spark mode.
Before i proposed the PIG-5029_1.patch and append a random integer to the
original key thus to make the skewed key distributed evenly([salted key
solution
|http://www.slideshare.net/SparkSummit/top-5-mistakes-when-writing-spark-applications-by-mark-grover-and-ted-malaska].
You mentioned some issues this patch brings such as duplicated and missing
results. Later [~tgraves] commented that
{quote}
I'm assuming in the presentation they are using a salt key such that it
generates the same random if the map task is reran like Koji mentioned. For
instance can you use the task id (not attempt) such that the key is always the
same if it re-runs.
{quote}
in PIG-5051_5029_5.patch
So now i generate the random by {noformat} new Random(seed) {noformat}. Here
the value of {noformat}seed{noformat} is
PigMapReduce.sJobConfInternal.get().get(PigConstants.TASK_INDEX); thus the
generated random integers in different task attempts of a spark task are same
if retries happens because the value of
PigMapReduce.sJobConfInternal.get().get(PigConstants.TASK_INDEX) are same.
Do you think this solution is ok? If you think it is ok. [~kexianda] will
start to review this patch.
Thanks [~knoguchi]'s patience on this jira again and this is a really
interesting issue.
> Optimize sort case when data is skewed
> --------------------------------------
>
> Key: PIG-5029
> URL: https://issues.apache.org/jira/browse/PIG-5029
> Project: Pig
> Issue Type: Sub-task
> Components: spark
> Reporter: liyunzhang_intel
> Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-5029.patch, PIG-5029_2.patch, PIG-5029_3.patch,
> SkewedData_L9.docx
>
>
> In PigMix L9.pig
> {code}
> register $PIGMIX_JAR
> A = load '$HDFS_ROOT/page_views' using
> org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
> as (user, action, timespent, query_term, ip_addr, timestamp,
> estimated_revenue, page_info, page_links);
> B = order A by query_term parallel $PARALLEL;
> store B into '$PIGMIX_OUTPUT/L9out';
> {code}
> The pig physical plan will be changed to spark plan and to spark lineage:
> {code}
> [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter
> (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at
> map at StoreConverter.java:80 []
> | MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 []
> | ShuffledRDD[6] at sortByKey at SortConverter.java:56 []
> +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 []
> | MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 []
> | MapPartitionsRDD[1] at map at LoadConverter.java:127 []
> | NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 []
> {code}
> We use
> [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56]
> to implement the sort feature. Although
> [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106]
> is used by RDD.sortByKey and RangePartitiner will sample data and ranges the
> key roughly into equal range, the test result(attached document) shows that
> one partition will load most keys and take long time to finish.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)