[ 
https://issues.apache.org/jira/browse/PIG-4504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14541566#comment-14541566
 ] 

liyunzhang_intel commented on PIG-4504:
---------------------------------------

[~mohitsabharwal]:
Now we do shuffle twice in following code:
GlobalRearrangeConverter#convert
{code}
public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
                              POGlobalRearrangeSpark physicalOperator) throws 
IOException {
....
  if (predecessors.size() == 1) {
            // GROUP
            JavaPairRDD<Object, Iterable<Tuple>> prdd = null;
            if (physicalOperator.isUseSecondaryKey()) {
                RDD<Tuple> rdd = predecessors.get(0);
                RDD<Tuple2<Tuple, Object>> rddPair = rdd.map(new 
ToKeyNullValueFunction(),
                        SparkUtil.<Tuple, Object>getTuple2Manifest());

                JavaPairRDD<Tuple, Object> pairRDD = new JavaPairRDD<Tuple, 
Object>(rddPair,
                        SparkUtil.getManifest(Tuple.class),
                        SparkUtil.getManifest(Object.class));

                //first sort the tuple by secondary key if enable 
useSecondaryKey sort
                JavaPairRDD<Tuple, Object> sorted = 
pairRDD.repartitionAndSortWithinPartitions(new HashPartitioner(parallelism), 
new PigSecondaryKeyComparatorSpark(physicalOperator.getSecondarySortOrder()));  
// first shuffle 
                JavaRDD<Tuple> mapped = sorted.mapPartitions(new 
ToValueFunction());
                prdd = mapped.groupBy(new GetKeyFunction(physicalOperator), 
parallelism);// second shuffle
            } else {
                JavaRDD<Tuple> jrdd = predecessors.get(0).toJavaRDD();
                prdd = jrdd.groupBy(new GetKeyFunction(physicalOperator), 
parallelism);
            }

            JavaRDD<Tuple> jrdd2 = prdd.map(new 
GroupTupleFunction(physicalOperator));
            return jrdd2.rdd();
        } 
....
}
{code}
we can optimize it according to the code from 
https://github.com/tresata/spark-sorted later. Now the functionality of 
secondary key sort is finished although repetitive shuffle is executed.

> Enable Secondary key sort feature in spark mode
> -----------------------------------------------
>
>                 Key: PIG-4504
>                 URL: https://issues.apache.org/jira/browse/PIG-4504
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: liyunzhang_intel
>            Assignee: liyunzhang_intel
>         Attachments: PIG-4504.patch, PIG-4504_2.patch, PIG-4504_3.patch, 
> PIG-4504_4.patch, PIG-4504_5.patch, PIG-4504_6.patch, PIG-4504_7.patch, 
> SecondaryKeySort_design_doc (1).docx, 
> Why_need_split_PoLocalRearrange_POGlobalRearrange_POPackage_into_two_SparkNodes_in_sparkPlan.docx
>
>
> *Some knowledge about secondary key sort:*
> MapReduce framework automatically sorts the keys generated by mappers. This 
> means that, before starting reducers all intermediate (key, value) pairs 
> generated by mappers must be sorted by key (and not by value). Values passed 
> to each reducer are not sorted at all and they can be in any order. But if we 
> make (key,value) as a compound key, let (key, value) pairs changes to 
> ((key,value), null) pairs. Here we call (key,value) as compound key, key is 
> the first key, value is the secondary key. In the shuffle process, pairs with 
> the same first key will be grouped into the same partition by setting 
> PartitionerClass in the JobConf . Pairs with the same first key but different 
> secondary key will be sorted in the process of shuffle by setting 
> SortComparatorClass in the JobConf. Pairs with the same first key but 
> different secondary key will be transferred to the same reduce function by 
> setting GroupingComparatorClass in the JobConf. 
> *How pig implements secondary key sort in mapreduce mode?*
> In MR:  it implements secondary key sort by setting GroupingComparatorClass, 
> PartitionerClass, SortComparatorClass in 
> [JobControlCompiler#getJob|https://github.com/kellyzly/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java#L915]
> *An example use secondary key sort:*
> TestAccumulator#testAccumWithSort
> Currently, secondary key sort feature is not implement in spark mode.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to