[ https://issues.apache.org/jira/browse/PIG-4577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14570244#comment-14570244 ]
Mohit Sabharwal commented on PIG-4577: -------------------------------------- +1 (non-binding) > Use "cogroup" spark api to implement "groupby+secondarysort" case in > GlobalRearrangeConverter.java > -------------------------------------------------------------------------------------------------- > > Key: PIG-4577 > URL: https://issues.apache.org/jira/browse/PIG-4577 > Project: Pig > Issue Type: Sub-task > Components: spark > Reporter: liyunzhang_intel > Assignee: liyunzhang_intel > Fix For: spark-branch > > Attachments: PIG-4577.patch > > > In PIG-4565(Support custom MR partitioners for Spark engine), we refine the > code of GlobalRearrangeConverter(use "cogroup" spark api to implement > "groupby","join" case except the "groupby+secondarysort" case) > in PIG-4565_2.patch: > GlobalRearrangeConverter.java > {code} > @Override > public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, > POGlobalRearrangeSpark op) throws IOException { > SparkUtil.assertPredecessorSizeGreaterThan(predecessors, > op, 0); > int parallelism = SparkUtil.getParallelism(predecessors, > op); > // TODO: Figure out the tradeoff of using CoGroupRDD (even for 1 > input), > // vs using groupBy (like we do in this commented code), vs using > // reduceByKey(). This is a pending task in Pig on Spark Milestone 1 > // Once we figure that out, we can allow custom partitioning for > // secondary sort case as well. > // if (predecessors.size() == 1) { > // // GROUP BY > // JavaPairRDD<Object, Iterable<Tuple>> prdd; > // if (op.isUseSecondaryKey()) { > // prdd = handleSecondarySort(predecessors.get(0), op, > parallelism); > // } else { > // JavaRDD<Tuple> jrdd = predecessors.get(0).toJavaRDD(); > // prdd = jrdd.groupBy(new GetKeyFunction(op), parallelism); > // prdd.groupByKey(new > CustomPartitioner(op.getCustomPartitioner(), > // parallelism)); > // } > // JavaRDD<Tuple> jrdd2 = prdd.map(new GroupTupleFunction(op)); > // return jrdd2.rdd(); > // > // if (predecessors.size() == 1 && op.isUseSecondaryKey()) { > // return handleSecondarySort(predecessors.get(0), op, > parallelism); > // } > if (predecessors.size() == 1 && op.isUseSecondaryKey()) { > return handleSecondarySort(predecessors.get(0), op, parallelism); > } > List<RDD<Tuple2<Object, Tuple>>> rddPairs = new > ArrayList<RDD<Tuple2<Object, Tuple>>>(); > for (RDD<Tuple> rdd : predecessors) { > JavaRDD<Tuple> jrdd = JavaRDD.fromRDD(rdd, > SparkUtil.getManifest(Tuple.class)); > JavaRDD<Tuple2<Object, Tuple>> rddPair = jrdd.map(new > ToKeyValueFunction()); > rddPairs.add(rddPair.rdd()); > } > // Something's wrong with the type parameters of CoGroupedRDD > // key and value are the same type ??? > CoGroupedRDD<Object> coGroupedRDD = new CoGroupedRDD<Object>( > (Seq<RDD<? extends Product2<Object, ?>>>) (Object) > (JavaConversions > .asScalaBuffer(rddPairs).toSeq()), > SparkUtil.getPartitioner(op.getCustomPartitioner(), > parallelism) > ); > RDD<Tuple2<Object, Seq<Seq<Tuple>>>> rdd = > (RDD<Tuple2<Object, Seq<Seq<Tuple>>>>) (Object) coGroupedRDD; > return rdd.toJavaRDD().map(new ToGroupKeyValueFunction()).rdd(); > } > {code} > Actually, we can also use "cogroup" spark api to implement > "secondarysort+groupby" case. -- This message was sent by Atlassian JIRA (v6.3.4#6332)