[ https://issues.apache.org/jira/browse/PIG-4553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
liyunzhang_intel updated PIG-4553: ---------------------------------- Attachment: (was: PIG-4553.patch) > Implement secondary sort using 1 shuffle not twice > -------------------------------------------------- > > Key: PIG-4553 > URL: https://issues.apache.org/jira/browse/PIG-4553 > Project: Pig > Issue Type: Sub-task > Components: spark > Reporter: liyunzhang_intel > Assignee: liyunzhang_intel > Fix For: spark-branch > > > Now we implement secondary key sort in > GlobalRearrangeConverter#convert > first shuffle in repartitionAndSortWithinPartitions second shuffle in groupBy > {code} > public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, > POGlobalRearrangeSpark physicalOperator) throws > IOException { > .... > if (predecessors.size() == 1) { > // GROUP > JavaPairRDD<Object, Iterable<Tuple>> prdd = null; > if (physicalOperator.isUseSecondaryKey()) { > RDD<Tuple> rdd = predecessors.get(0); > RDD<Tuple2<Tuple, Object>> rddPair = rdd.map(new > ToKeyNullValueFunction(), > SparkUtil.<Tuple, Object>getTuple2Manifest()); > JavaPairRDD<Tuple, Object> pairRDD = new JavaPairRDD<Tuple, > Object>(rddPair, > SparkUtil.getManifest(Tuple.class), > SparkUtil.getManifest(Object.class)); > //first sort the tuple by secondary key if enable > useSecondaryKey sort > JavaPairRDD<Tuple, Object> sorted = > pairRDD.repartitionAndSortWithinPartitions(new HashPartitioner(parallelism), > new > PigSecondaryKeyComparatorSpark(physicalOperator.getSecondarySortOrder())); > // first shuffle > JavaRDD<Tuple> mapped = sorted.mapPartitions(new > ToValueFunction()); > prdd = mapped.groupBy(new GetKeyFunction(physicalOperator), > parallelism);// second shuffle > } else { > JavaRDD<Tuple> jrdd = predecessors.get(0).toJavaRDD(); > prdd = jrdd.groupBy(new GetKeyFunction(physicalOperator), > parallelism); > } > JavaRDD<Tuple> jrdd2 = prdd.map(new > GroupTupleFunction(physicalOperator)); > return jrdd2.rdd(); > } > .... > } > {code} > we can optimize it according to the code from > https://github.com/tresata/spark-sorted. -- This message was sent by Atlassian JIRA (v6.3.4#6332)