[
https://issues.apache.org/jira/browse/PIG-4504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14541566#comment-14541566
]
liyunzhang_intel commented on PIG-4504:
---------------------------------------
[~mohitsabharwal]:
Now we do shuffle twice in following code:
GlobalRearrangeConverter#convert
{code}
public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
POGlobalRearrangeSpark physicalOperator) throws
IOException {
....
if (predecessors.size() == 1) {
// GROUP
JavaPairRDD<Object, Iterable<Tuple>> prdd = null;
if (physicalOperator.isUseSecondaryKey()) {
RDD<Tuple> rdd = predecessors.get(0);
RDD<Tuple2<Tuple, Object>> rddPair = rdd.map(new
ToKeyNullValueFunction(),
SparkUtil.<Tuple, Object>getTuple2Manifest());
JavaPairRDD<Tuple, Object> pairRDD = new JavaPairRDD<Tuple,
Object>(rddPair,
SparkUtil.getManifest(Tuple.class),
SparkUtil.getManifest(Object.class));
//first sort the tuple by secondary key if enable
useSecondaryKey sort
JavaPairRDD<Tuple, Object> sorted =
pairRDD.repartitionAndSortWithinPartitions(new HashPartitioner(parallelism),
new PigSecondaryKeyComparatorSpark(physicalOperator.getSecondarySortOrder()));
// first shuffle
JavaRDD<Tuple> mapped = sorted.mapPartitions(new
ToValueFunction());
prdd = mapped.groupBy(new GetKeyFunction(physicalOperator),
parallelism);// second shuffle
} else {
JavaRDD<Tuple> jrdd = predecessors.get(0).toJavaRDD();
prdd = jrdd.groupBy(new GetKeyFunction(physicalOperator),
parallelism);
}
JavaRDD<Tuple> jrdd2 = prdd.map(new
GroupTupleFunction(physicalOperator));
return jrdd2.rdd();
}
....
}
{code}
we can optimize it according to the code from
https://github.com/tresata/spark-sorted later. Now the functionality of
secondary key sort is finished although repetitive shuffle is executed.
> Enable Secondary key sort feature in spark mode
> -----------------------------------------------
>
> Key: PIG-4504
> URL: https://issues.apache.org/jira/browse/PIG-4504
> Project: Pig
> Issue Type: Sub-task
> Components: spark
> Reporter: liyunzhang_intel
> Assignee: liyunzhang_intel
> Attachments: PIG-4504.patch, PIG-4504_2.patch, PIG-4504_3.patch,
> PIG-4504_4.patch, PIG-4504_5.patch, PIG-4504_6.patch, PIG-4504_7.patch,
> SecondaryKeySort_design_doc (1).docx,
> Why_need_split_PoLocalRearrange_POGlobalRearrange_POPackage_into_two_SparkNodes_in_sparkPlan.docx
>
>
> *Some knowledge about secondary key sort:*
> MapReduce framework automatically sorts the keys generated by mappers. This
> means that, before starting reducers all intermediate (key, value) pairs
> generated by mappers must be sorted by key (and not by value). Values passed
> to each reducer are not sorted at all and they can be in any order. But if we
> make (key,value) as a compound key, let (key, value) pairs changes to
> ((key,value), null) pairs. Here we call (key,value) as compound key, key is
> the first key, value is the secondary key. In the shuffle process, pairs with
> the same first key will be grouped into the same partition by setting
> PartitionerClass in the JobConf . Pairs with the same first key but different
> secondary key will be sorted in the process of shuffle by setting
> SortComparatorClass in the JobConf. Pairs with the same first key but
> different secondary key will be transferred to the same reduce function by
> setting GroupingComparatorClass in the JobConf.
> *How pig implements secondary key sort in mapreduce mode?*
> In MR: it implements secondary key sort by setting GroupingComparatorClass,
> PartitionerClass, SortComparatorClass in
> [JobControlCompiler#getJob|https://github.com/kellyzly/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java#L915]
> *An example use secondary key sort:*
> TestAccumulator#testAccumWithSort
> Currently, secondary key sort feature is not implement in spark mode.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)