This is an exerpt from the Design document of the implementation of Sort based shuffle.. I am thinking I might be wrong in my perception of sort based shuffle. Dont completely understand it though.
*Motivation* A sortbased shuffle can be more scalable than Spark’s current hashbased one because it doesn’t require writing a separate file for each reduce task from each mapper. Instead, we write a single sorted file and serve ranges of it to different reducers. In jobs with a lot of reduce tasks (say 10,000+), this saves significant memory for compression and serialization buffers and results in more sequential disk I/O. *Implementation* To perform a sortbased shuffle, each map task will produce one or more output files sorted by a key’s partition ID, then mergesort them to yield a single output file. Because it’s only necessary to group the keys together into partitions, we won’t bother to also sort them within each partition On Tue, Feb 3, 2015 at 5:41 PM, Nitin kak <nitinkak...@gmail.com> wrote: > I thought thats what sort based shuffled did, sort the keys going to the > same partition. > > I have tried (c1, c2) as (Int, Int) tuple as well. I don't think that > ordering of c2 type is the problem here. > > On Tue, Feb 3, 2015 at 5:21 PM, Sean Owen <so...@cloudera.com> wrote: > >> Hm, I don't think the sort partitioner is going to cause the result to >> be ordered by c1,c2 if you only partitioned on c1. I mean, it's not >> even guaranteed that the type of c2 has an ordering, right? >> >> On Tue, Feb 3, 2015 at 3:38 PM, nitinkak001 <nitinkak...@gmail.com> >> wrote: >> > I am trying to implement secondary sort in spark as we do in map-reduce. >> > >> > Here is my data(tab separated, without c1, c2, c2). >> > c1 c2 c3 >> > 1 2 4 >> > 1 3 6 >> > 2 4 7 >> > 2 6 8 >> > 3 5 5 >> > 3 1 8 >> > 3 2 0 >> > >> > To do secondary sort, I create paried RDD as >> > >> > /((c1 + ","+ c2), row)/ >> > >> > and then use a custom partitioner to partition only on c1. I have set >> > /spark.shuffle.manager = SORT/ so the keys per partition are sorted. >> For the >> > key "3" I am expecting to get >> > (3, 1) >> > (3, 2) >> > (3, 5) >> > but still getting the original order >> > 3,5 >> > 3,1 >> > 3,2 >> > >> > Here is the custom partitioner code: >> > >> > /class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner >> { >> > def numPartitions = p >> > def getPartition(key: Any) = { >> > key.asInstanceOf[String].split(",")(0).toInt >> > } >> > >> > }/ >> > >> > and driver code, please tell me what I am doing wrong >> > >> > /val conf = new SparkConf().setAppName("MapInheritanceExample") >> > conf.set("spark.shuffle.manager", "SORT"); >> > val sc = new SparkContext(conf) >> > val pF = sc.textFile(inputFile) >> > >> > val log = LogFactory.getLog("MapFunctionTest") >> > val partitionedRDD = pF.map { x => >> > >> > var arr = x.split("\t"); >> > (arr(0)+","+arr(1), null) >> > >> > }.partitionBy(new StraightPartitioner(10)) >> > >> > var outputRDD = partitionedRDD.mapPartitions(p => { >> > p.map({ case(o, n) => { >> > o >> > } >> > }) >> > })/ >> > >> > >> > >> > -- >> > View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487.html >> > Sent from the Apache Spark User List mailing list archive at Nabble.com. >> > >> > --------------------------------------------------------------------- >> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> > For additional commands, e-mail: user-h...@spark.apache.org >> > >> > >
Sort-basedshuffledesign.pdf
Description: Adobe PDF document
--------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org