I am trying to implement secondary sort in spark as we do in map-reduce. 

Here is my data(tab separated, without c1, c2, c2). 
c1    c2     c3
1       2       4
1       3       6
2       4       7
2       6       8
3       5       5
3       1       8
3       2       0

To do secondary sort, I create paried RDD as 

/((c1 + ","+ c2), row)/

and then use a custom partitioner to partition only on c1. I have set
/spark.shuffle.manager = SORT/ so the keys per partition are sorted. For the
key "3" I am expecting to get 
(3, 1)
(3, 2)
(3, 5)
but still getting the original order

Here is the custom partitioner code:

/class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner {
  def numPartitions = p
  def getPartition(key: Any) = {

and driver code, please tell me what I am doing wrong

/val conf = new SparkConf().setAppName("MapInheritanceExample")
    conf.set("spark.shuffle.manager", "SORT");
    val sc = new SparkContext(conf)
    val pF = sc.textFile(inputFile)
    val log = LogFactory.getLog("MapFunctionTest")    
    val partitionedRDD = pF.map { x =>

        var arr = x.split("\t");
        (arr(0)+","+arr(1), null)

    }.partitionBy(new StraightPartitioner(10))
    var outputRDD = partitionedRDD.mapPartitions(p => {
      p.map({ case(o, n) => {

View this message in context: 
Sent from the Apache Spark User List mailing list archive at Nabble.com.

To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to