I am trying to implement secondary sort in spark as we do in map-reduce. 

Here is my data(tab separated, without c1, c2, c2). 
c1    c2     c3
1       2       4
1       3       6
2       4       7
2       6       8
3       5       5
3       1       8
3       2       0

To do secondary sort, I create paried RDD as 

/((c1 + ","+ c2), row)/

and then use a custom partitioner to partition only on c1. I have set
/spark.shuffle.manager = SORT/ so the keys per partition are sorted. For the
key "3" I am expecting to get 
(3, 1)
(3, 2)
(3, 5)
but still getting the original order
3,5
3,1
3,2

Here is the custom partitioner code:

/class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner {
  def numPartitions = p
  def getPartition(key: Any) = {
    key.asInstanceOf[String].split(",")(0).toInt
  }
                                
}/

and driver code, please tell me what I am doing wrong

/val conf = new SparkConf().setAppName("MapInheritanceExample")
    conf.set("spark.shuffle.manager", "SORT");
    val sc = new SparkContext(conf)
    val pF = sc.textFile(inputFile)
    
    val log = LogFactory.getLog("MapFunctionTest")    
    val partitionedRDD = pF.map { x =>

        var arr = x.split("\t");
        (arr(0)+","+arr(1), null)

    }.partitionBy(new StraightPartitioner(10))
        
    var outputRDD = partitionedRDD.mapPartitions(p => {
      p.map({ case(o, n) => {
               o
        }
      })
    })/



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to