There seems to be a way around it. I created a ShuffledRDD with the partitioner in my code and using setKeyOrdering on it. It worked!!
*new ShuffledRDD[String, String, String](pairRDD, new StraightPartitioner(10))* * .setKeyOrdering(new scala.math.Ordering[String]{* * def compare(x : String, y : String) = x compare y* * })* On Wed, Feb 4, 2015 at 8:39 AM, Imran Rashid <iras...@cloudera.com> wrote: > I think you are interested in secondary sort, which is still being worked > on: > > https://issues.apache.org/jira/browse/SPARK-3655 > > On Tue, Feb 3, 2015 at 4:41 PM, Nitin kak <nitinkak...@gmail.com> wrote: > >> I thought thats what sort based shuffled did, sort the keys going to the >> same partition. >> >> I have tried (c1, c2) as (Int, Int) tuple as well. I don't think that >> ordering of c2 type is the problem here. >> >> On Tue, Feb 3, 2015 at 5:21 PM, Sean Owen <so...@cloudera.com> wrote: >> >>> Hm, I don't think the sort partitioner is going to cause the result to >>> be ordered by c1,c2 if you only partitioned on c1. I mean, it's not >>> even guaranteed that the type of c2 has an ordering, right? >>> >>> On Tue, Feb 3, 2015 at 3:38 PM, nitinkak001 <nitinkak...@gmail.com> >>> wrote: >>> > I am trying to implement secondary sort in spark as we do in >>> map-reduce. >>> > >>> > Here is my data(tab separated, without c1, c2, c2). >>> > c1 c2 c3 >>> > 1 2 4 >>> > 1 3 6 >>> > 2 4 7 >>> > 2 6 8 >>> > 3 5 5 >>> > 3 1 8 >>> > 3 2 0 >>> > >>> > To do secondary sort, I create paried RDD as >>> > >>> > /((c1 + ","+ c2), row)/ >>> > >>> > and then use a custom partitioner to partition only on c1. I have set >>> > /spark.shuffle.manager = SORT/ so the keys per partition are sorted. >>> For the >>> > key "3" I am expecting to get >>> > (3, 1) >>> > (3, 2) >>> > (3, 5) >>> > but still getting the original order >>> > 3,5 >>> > 3,1 >>> > 3,2 >>> > >>> > Here is the custom partitioner code: >>> > >>> > /class StraightPartitioner(p: Int) extends >>> org.apache.spark.Partitioner { >>> > def numPartitions = p >>> > def getPartition(key: Any) = { >>> > key.asInstanceOf[String].split(",")(0).toInt >>> > } >>> > >>> > }/ >>> > >>> > and driver code, please tell me what I am doing wrong >>> > >>> > /val conf = new SparkConf().setAppName("MapInheritanceExample") >>> > conf.set("spark.shuffle.manager", "SORT"); >>> > val sc = new SparkContext(conf) >>> > val pF = sc.textFile(inputFile) >>> > >>> > val log = LogFactory.getLog("MapFunctionTest") >>> > val partitionedRDD = pF.map { x => >>> > >>> > var arr = x.split("\t"); >>> > (arr(0)+","+arr(1), null) >>> > >>> > }.partitionBy(new StraightPartitioner(10)) >>> > >>> > var outputRDD = partitionedRDD.mapPartitions(p => { >>> > p.map({ case(o, n) => { >>> > o >>> > } >>> > }) >>> > })/ >>> > >>> > >>> > >>> > -- >>> > View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487.html >>> > Sent from the Apache Spark User List mailing list archive at >>> Nabble.com. >>> > >>> > --------------------------------------------------------------------- >>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> > For additional commands, e-mail: user-h...@spark.apache.org >>> > >>> >> >> >