This is an exerpt from the Design document of the implementation of Sort
based shuffle.. I am thinking I might be wrong in my perception of sort
based shuffle. Dont  completely understand it though.

*Motivation*
A sort­based shuffle can be more scalable than Spark’s current hash­based
one because it doesn’t require writing a separate file for each reduce task
from each mapper. Instead, we write a single sorted file and serve ranges
of it to different reducers. In jobs with a lot of reduce tasks (say
10,000+), this saves significant memory for compression and serialization
buffers and results in more sequential disk I/O.

*Implementation*
To perform a sort­based shuffle, each map task will produce one or more
output files sorted by a key’s partition ID, then merge­sort them to yield
a single output file. Because it’s only necessary to group the keys
together into partitions, we won’t bother to also sort them within each
partition

On Tue, Feb 3, 2015 at 5:41 PM, Nitin kak <nitinkak...@gmail.com> wrote:

> I thought thats what sort based shuffled did, sort the keys going to the
> same partition.
>
> I have tried (c1, c2) as (Int, Int) tuple as well. I don't think that
> ordering of c2 type is the problem here.
>
> On Tue, Feb 3, 2015 at 5:21 PM, Sean Owen <so...@cloudera.com> wrote:
>
>> Hm, I don't think the sort partitioner is going to cause the result to
>> be ordered by c1,c2 if you only partitioned on c1. I mean, it's not
>> even guaranteed that the type of c2 has an ordering, right?
>>
>> On Tue, Feb 3, 2015 at 3:38 PM, nitinkak001 <nitinkak...@gmail.com>
>> wrote:
>> > I am trying to implement secondary sort in spark as we do in map-reduce.
>> >
>> > Here is my data(tab separated, without c1, c2, c2).
>> > c1    c2     c3
>> > 1       2       4
>> > 1       3       6
>> > 2       4       7
>> > 2       6       8
>> > 3       5       5
>> > 3       1       8
>> > 3       2       0
>> >
>> > To do secondary sort, I create paried RDD as
>> >
>> > /((c1 + ","+ c2), row)/
>> >
>> > and then use a custom partitioner to partition only on c1. I have set
>> > /spark.shuffle.manager = SORT/ so the keys per partition are sorted.
>> For the
>> > key "3" I am expecting to get
>> > (3, 1)
>> > (3, 2)
>> > (3, 5)
>> > but still getting the original order
>> > 3,5
>> > 3,1
>> > 3,2
>> >
>> > Here is the custom partitioner code:
>> >
>> > /class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner
>> {
>> >   def numPartitions = p
>> >   def getPartition(key: Any) = {
>> >     key.asInstanceOf[String].split(",")(0).toInt
>> >   }
>> >
>> > }/
>> >
>> > and driver code, please tell me what I am doing wrong
>> >
>> > /val conf = new SparkConf().setAppName("MapInheritanceExample")
>> >     conf.set("spark.shuffle.manager", "SORT");
>> >     val sc = new SparkContext(conf)
>> >     val pF = sc.textFile(inputFile)
>> >
>> >     val log = LogFactory.getLog("MapFunctionTest")
>> >     val partitionedRDD = pF.map { x =>
>> >
>> >         var arr = x.split("\t");
>> >         (arr(0)+","+arr(1), null)
>> >
>> >     }.partitionBy(new StraightPartitioner(10))
>> >
>> >     var outputRDD = partitionedRDD.mapPartitions(p => {
>> >       p.map({ case(o, n) => {
>> >                o
>> >         }
>> >       })
>> >     })/
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > ---------------------------------------------------------------------
>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> > For additional commands, e-mail: user-h...@spark.apache.org
>> >
>>
>
>

Attachment: Sort-basedshuffledesign.pdf
Description: Adobe PDF document

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to