[ 
https://issues.apache.org/jira/browse/PIG-4553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liyunzhang_intel updated PIG-4553:
----------------------------------
    Attachment:     (was: PIG-4553.patch)

> Implement secondary sort using 1 shuffle not twice
> --------------------------------------------------
>
>                 Key: PIG-4553
>                 URL: https://issues.apache.org/jira/browse/PIG-4553
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: liyunzhang_intel
>            Assignee: liyunzhang_intel
>             Fix For: spark-branch
>
>
> Now we implement secondary key sort in 
> GlobalRearrangeConverter#convert
> first shuffle in repartitionAndSortWithinPartitions second shuffle in groupBy
> {code}
> public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
>                               POGlobalRearrangeSpark physicalOperator) throws 
> IOException {
> ....
>   if (predecessors.size() == 1) {
>             // GROUP
>             JavaPairRDD<Object, Iterable<Tuple>> prdd = null;
>             if (physicalOperator.isUseSecondaryKey()) {
>                 RDD<Tuple> rdd = predecessors.get(0);
>                 RDD<Tuple2<Tuple, Object>> rddPair = rdd.map(new 
> ToKeyNullValueFunction(),
>                         SparkUtil.<Tuple, Object>getTuple2Manifest());
>                 JavaPairRDD<Tuple, Object> pairRDD = new JavaPairRDD<Tuple, 
> Object>(rddPair,
>                         SparkUtil.getManifest(Tuple.class),
>                         SparkUtil.getManifest(Object.class));
>                 //first sort the tuple by secondary key if enable 
> useSecondaryKey sort
>                 JavaPairRDD<Tuple, Object> sorted = 
> pairRDD.repartitionAndSortWithinPartitions(new HashPartitioner(parallelism), 
> new 
> PigSecondaryKeyComparatorSpark(physicalOperator.getSecondarySortOrder()));  
> // first shuffle 
>                 JavaRDD<Tuple> mapped = sorted.mapPartitions(new 
> ToValueFunction());
>                 prdd = mapped.groupBy(new GetKeyFunction(physicalOperator), 
> parallelism);// second shuffle
>             } else {
>                 JavaRDD<Tuple> jrdd = predecessors.get(0).toJavaRDD();
>                 prdd = jrdd.groupBy(new GetKeyFunction(physicalOperator), 
> parallelism);
>             }
>             JavaRDD<Tuple> jrdd2 = prdd.map(new 
> GroupTupleFunction(physicalOperator));
>             return jrdd2.rdd();
>         } 
> ....
> }
> {code}
> we can optimize it according to the code from 
> https://github.com/tresata/spark-sorted.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to