liyunzhang_intel created PIG-4553:
-------------------------------------

             Summary: Implement secondary sort using 1 shuffle not twice
                 Key: PIG-4553
                 URL: https://issues.apache.org/jira/browse/PIG-4553
             Project: Pig
          Issue Type: Sub-task
            Reporter: liyunzhang_intel


Now we implement secondary key sort in 

GlobalRearrangeConverter#convert
first shuffle in repartitionAndSortWithinPartitions second shuffle in groupBy
{code}
public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
                              POGlobalRearrangeSpark physicalOperator) throws 
IOException {
....
  if (predecessors.size() == 1) {
            // GROUP
            JavaPairRDD<Object, Iterable<Tuple>> prdd = null;
            if (physicalOperator.isUseSecondaryKey()) {
                RDD<Tuple> rdd = predecessors.get(0);
                RDD<Tuple2<Tuple, Object>> rddPair = rdd.map(new 
ToKeyNullValueFunction(),
                        SparkUtil.<Tuple, Object>getTuple2Manifest());

                JavaPairRDD<Tuple, Object> pairRDD = new JavaPairRDD<Tuple, 
Object>(rddPair,
                        SparkUtil.getManifest(Tuple.class),
                        SparkUtil.getManifest(Object.class));

                //first sort the tuple by secondary key if enable 
useSecondaryKey sort
                JavaPairRDD<Tuple, Object> sorted = 
pairRDD.repartitionAndSortWithinPartitions(new HashPartitioner(parallelism), 
new PigSecondaryKeyComparatorSpark(physicalOperator.getSecondarySortOrder()));  
// first shuffle 
                JavaRDD<Tuple> mapped = sorted.mapPartitions(new 
ToValueFunction());
                prdd = mapped.groupBy(new GetKeyFunction(physicalOperator), 
parallelism);// second shuffle
            } else {
                JavaRDD<Tuple> jrdd = predecessors.get(0).toJavaRDD();
                prdd = jrdd.groupBy(new GetKeyFunction(physicalOperator), 
parallelism);
            }

            JavaRDD<Tuple> jrdd2 = prdd.map(new 
GroupTupleFunction(physicalOperator));
            return jrdd2.rdd();
        } 
....
}
{code}
we can optimize it according to the code from 
https://github.com/tresata/spark-sorted.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to