There seems to be a way around it. I created a ShuffledRDD with the
partitioner in my code and using setKeyOrdering on it. It worked!!

*new ShuffledRDD[String, String, String](pairRDD, new
StraightPartitioner(10))*
*    .setKeyOrdering(new scala.math.Ordering[String]{*
*      def compare(x : String, y : String) = x compare y*
*    })*

On Wed, Feb 4, 2015 at 8:39 AM, Imran Rashid <iras...@cloudera.com> wrote:

> I think you are interested in secondary sort, which is still being worked
> on:
>
> https://issues.apache.org/jira/browse/SPARK-3655
>
> On Tue, Feb 3, 2015 at 4:41 PM, Nitin kak <nitinkak...@gmail.com> wrote:
>
>> I thought thats what sort based shuffled did, sort the keys going to the
>> same partition.
>>
>> I have tried (c1, c2) as (Int, Int) tuple as well. I don't think that
>> ordering of c2 type is the problem here.
>>
>> On Tue, Feb 3, 2015 at 5:21 PM, Sean Owen <so...@cloudera.com> wrote:
>>
>>> Hm, I don't think the sort partitioner is going to cause the result to
>>> be ordered by c1,c2 if you only partitioned on c1. I mean, it's not
>>> even guaranteed that the type of c2 has an ordering, right?
>>>
>>> On Tue, Feb 3, 2015 at 3:38 PM, nitinkak001 <nitinkak...@gmail.com>
>>> wrote:
>>> > I am trying to implement secondary sort in spark as we do in
>>> map-reduce.
>>> >
>>> > Here is my data(tab separated, without c1, c2, c2).
>>> > c1    c2     c3
>>> > 1       2       4
>>> > 1       3       6
>>> > 2       4       7
>>> > 2       6       8
>>> > 3       5       5
>>> > 3       1       8
>>> > 3       2       0
>>> >
>>> > To do secondary sort, I create paried RDD as
>>> >
>>> > /((c1 + ","+ c2), row)/
>>> >
>>> > and then use a custom partitioner to partition only on c1. I have set
>>> > /spark.shuffle.manager = SORT/ so the keys per partition are sorted.
>>> For the
>>> > key "3" I am expecting to get
>>> > (3, 1)
>>> > (3, 2)
>>> > (3, 5)
>>> > but still getting the original order
>>> > 3,5
>>> > 3,1
>>> > 3,2
>>> >
>>> > Here is the custom partitioner code:
>>> >
>>> > /class StraightPartitioner(p: Int) extends
>>> org.apache.spark.Partitioner {
>>> >   def numPartitions = p
>>> >   def getPartition(key: Any) = {
>>> >     key.asInstanceOf[String].split(",")(0).toInt
>>> >   }
>>> >
>>> > }/
>>> >
>>> > and driver code, please tell me what I am doing wrong
>>> >
>>> > /val conf = new SparkConf().setAppName("MapInheritanceExample")
>>> >     conf.set("spark.shuffle.manager", "SORT");
>>> >     val sc = new SparkContext(conf)
>>> >     val pF = sc.textFile(inputFile)
>>> >
>>> >     val log = LogFactory.getLog("MapFunctionTest")
>>> >     val partitionedRDD = pF.map { x =>
>>> >
>>> >         var arr = x.split("\t");
>>> >         (arr(0)+","+arr(1), null)
>>> >
>>> >     }.partitionBy(new StraightPartitioner(10))
>>> >
>>> >     var outputRDD = partitionedRDD.mapPartitions(p => {
>>> >       p.map({ case(o, n) => {
>>> >                o
>>> >         }
>>> >       })
>>> >     })/
>>> >
>>> >
>>> >
>>> > --
>>> > View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487.html
>>> > Sent from the Apache Spark User List mailing list archive at
>>> Nabble.com.
>>> >
>>> > ---------------------------------------------------------------------
>>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> > For additional commands, e-mail: user-h...@spark.apache.org
>>> >
>>>
>>
>>
>

Reply via email to