Re: Sort based shuffle not working properly?
I think you are interested in secondary sort, which is still being worked on: https://issues.apache.org/jira/browse/SPARK-3655 On Tue, Feb 3, 2015 at 4:41 PM, Nitin kak nitinkak...@gmail.com wrote: I thought thats what sort based shuffled did, sort the keys going to the same partition. I have tried (c1, c2) as (Int, Int) tuple as well. I don't think that ordering of c2 type is the problem here. On Tue, Feb 3, 2015 at 5:21 PM, Sean Owen so...@cloudera.com wrote: Hm, I don't think the sort partitioner is going to cause the result to be ordered by c1,c2 if you only partitioned on c1. I mean, it's not even guaranteed that the type of c2 has an ordering, right? On Tue, Feb 3, 2015 at 3:38 PM, nitinkak001 nitinkak...@gmail.com wrote: I am trying to implement secondary sort in spark as we do in map-reduce. Here is my data(tab separated, without c1, c2, c2). c1c2 c3 1 2 4 1 3 6 2 4 7 2 6 8 3 5 5 3 1 8 3 2 0 To do secondary sort, I create paried RDD as /((c1 + ,+ c2), row)/ and then use a custom partitioner to partition only on c1. I have set /spark.shuffle.manager = SORT/ so the keys per partition are sorted. For the key 3 I am expecting to get (3, 1) (3, 2) (3, 5) but still getting the original order 3,5 3,1 3,2 Here is the custom partitioner code: /class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner { def numPartitions = p def getPartition(key: Any) = { key.asInstanceOf[String].split(,)(0).toInt } }/ and driver code, please tell me what I am doing wrong /val conf = new SparkConf().setAppName(MapInheritanceExample) conf.set(spark.shuffle.manager, SORT); val sc = new SparkContext(conf) val pF = sc.textFile(inputFile) val log = LogFactory.getLog(MapFunctionTest) val partitionedRDD = pF.map { x = var arr = x.split(\t); (arr(0)+,+arr(1), null) }.partitionBy(new StraightPartitioner(10)) var outputRDD = partitionedRDD.mapPartitions(p = { p.map({ case(o, n) = { o } }) })/ -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Sort based shuffle not working properly?
Nitin, Suing Spark is not going to help. Perhaps you should sue someone else :-) Just kidding! Mohammed -Original Message- From: nitinkak001 [mailto:nitinkak...@gmail.com] Sent: Tuesday, February 3, 2015 1:57 PM To: user@spark.apache.org Subject: Re: Sort based shuffle not working properly? Just to add, I am suing Spark 1.1.0 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487p21488.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Sort based shuffle not working properly?
Hm, I don't think the sort partitioner is going to cause the result to be ordered by c1,c2 if you only partitioned on c1. I mean, it's not even guaranteed that the type of c2 has an ordering, right? On Tue, Feb 3, 2015 at 3:38 PM, nitinkak001 nitinkak...@gmail.com wrote: I am trying to implement secondary sort in spark as we do in map-reduce. Here is my data(tab separated, without c1, c2, c2). c1c2 c3 1 2 4 1 3 6 2 4 7 2 6 8 3 5 5 3 1 8 3 2 0 To do secondary sort, I create paried RDD as /((c1 + ,+ c2), row)/ and then use a custom partitioner to partition only on c1. I have set /spark.shuffle.manager = SORT/ so the keys per partition are sorted. For the key 3 I am expecting to get (3, 1) (3, 2) (3, 5) but still getting the original order 3,5 3,1 3,2 Here is the custom partitioner code: /class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner { def numPartitions = p def getPartition(key: Any) = { key.asInstanceOf[String].split(,)(0).toInt } }/ and driver code, please tell me what I am doing wrong /val conf = new SparkConf().setAppName(MapInheritanceExample) conf.set(spark.shuffle.manager, SORT); val sc = new SparkContext(conf) val pF = sc.textFile(inputFile) val log = LogFactory.getLog(MapFunctionTest) val partitionedRDD = pF.map { x = var arr = x.split(\t); (arr(0)+,+arr(1), null) }.partitionBy(new StraightPartitioner(10)) var outputRDD = partitionedRDD.mapPartitions(p = { p.map({ case(o, n) = { o } }) })/ -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Sort based shuffle not working properly?
This is an exerpt from the Design document of the implementation of Sort based shuffle.. I am thinking I might be wrong in my perception of sort based shuffle. Dont completely understand it though. *Motivation* A sortbased shuffle can be more scalable than Spark’s current hashbased one because it doesn’t require writing a separate file for each reduce task from each mapper. Instead, we write a single sorted file and serve ranges of it to different reducers. In jobs with a lot of reduce tasks (say 10,000+), this saves significant memory for compression and serialization buffers and results in more sequential disk I/O. *Implementation* To perform a sortbased shuffle, each map task will produce one or more output files sorted by a key’s partition ID, then mergesort them to yield a single output file. Because it’s only necessary to group the keys together into partitions, we won’t bother to also sort them within each partition On Tue, Feb 3, 2015 at 5:41 PM, Nitin kak nitinkak...@gmail.com wrote: I thought thats what sort based shuffled did, sort the keys going to the same partition. I have tried (c1, c2) as (Int, Int) tuple as well. I don't think that ordering of c2 type is the problem here. On Tue, Feb 3, 2015 at 5:21 PM, Sean Owen so...@cloudera.com wrote: Hm, I don't think the sort partitioner is going to cause the result to be ordered by c1,c2 if you only partitioned on c1. I mean, it's not even guaranteed that the type of c2 has an ordering, right? On Tue, Feb 3, 2015 at 3:38 PM, nitinkak001 nitinkak...@gmail.com wrote: I am trying to implement secondary sort in spark as we do in map-reduce. Here is my data(tab separated, without c1, c2, c2). c1c2 c3 1 2 4 1 3 6 2 4 7 2 6 8 3 5 5 3 1 8 3 2 0 To do secondary sort, I create paried RDD as /((c1 + ,+ c2), row)/ and then use a custom partitioner to partition only on c1. I have set /spark.shuffle.manager = SORT/ so the keys per partition are sorted. For the key 3 I am expecting to get (3, 1) (3, 2) (3, 5) but still getting the original order 3,5 3,1 3,2 Here is the custom partitioner code: /class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner { def numPartitions = p def getPartition(key: Any) = { key.asInstanceOf[String].split(,)(0).toInt } }/ and driver code, please tell me what I am doing wrong /val conf = new SparkConf().setAppName(MapInheritanceExample) conf.set(spark.shuffle.manager, SORT); val sc = new SparkContext(conf) val pF = sc.textFile(inputFile) val log = LogFactory.getLog(MapFunctionTest) val partitionedRDD = pF.map { x = var arr = x.split(\t); (arr(0)+,+arr(1), null) }.partitionBy(new StraightPartitioner(10)) var outputRDD = partitionedRDD.mapPartitions(p = { p.map({ case(o, n) = { o } }) })/ -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org Sort-basedshuffledesign.pdf Description: Adobe PDF document - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Sort based shuffle not working properly?
I thought thats what sort based shuffled did, sort the keys going to the same partition. I have tried (c1, c2) as (Int, Int) tuple as well. I don't think that ordering of c2 type is the problem here. On Tue, Feb 3, 2015 at 5:21 PM, Sean Owen so...@cloudera.com wrote: Hm, I don't think the sort partitioner is going to cause the result to be ordered by c1,c2 if you only partitioned on c1. I mean, it's not even guaranteed that the type of c2 has an ordering, right? On Tue, Feb 3, 2015 at 3:38 PM, nitinkak001 nitinkak...@gmail.com wrote: I am trying to implement secondary sort in spark as we do in map-reduce. Here is my data(tab separated, without c1, c2, c2). c1c2 c3 1 2 4 1 3 6 2 4 7 2 6 8 3 5 5 3 1 8 3 2 0 To do secondary sort, I create paried RDD as /((c1 + ,+ c2), row)/ and then use a custom partitioner to partition only on c1. I have set /spark.shuffle.manager = SORT/ so the keys per partition are sorted. For the key 3 I am expecting to get (3, 1) (3, 2) (3, 5) but still getting the original order 3,5 3,1 3,2 Here is the custom partitioner code: /class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner { def numPartitions = p def getPartition(key: Any) = { key.asInstanceOf[String].split(,)(0).toInt } }/ and driver code, please tell me what I am doing wrong /val conf = new SparkConf().setAppName(MapInheritanceExample) conf.set(spark.shuffle.manager, SORT); val sc = new SparkContext(conf) val pF = sc.textFile(inputFile) val log = LogFactory.getLog(MapFunctionTest) val partitionedRDD = pF.map { x = var arr = x.split(\t); (arr(0)+,+arr(1), null) }.partitionBy(new StraightPartitioner(10)) var outputRDD = partitionedRDD.mapPartitions(p = { p.map({ case(o, n) = { o } }) })/ -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Sort based shuffle not working properly?
I am trying to implement secondary sort in spark as we do in map-reduce. Here is my data(tab separated, without c1, c2, c2). c1c2 c3 1 2 4 1 3 6 2 4 7 2 6 8 3 5 5 3 1 8 3 2 0 To do secondary sort, I create paried RDD as /((c1 + ,+ c2), row)/ and then use a custom partitioner to partition only on c1. I have set /spark.shuffle.manager = SORT/ so the keys per partition are sorted. For the key 3 I am expecting to get (3, 1) (3, 2) (3, 5) but still getting the original order 3,5 3,1 3,2 Here is the custom partitioner code: /class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner { def numPartitions = p def getPartition(key: Any) = { key.asInstanceOf[String].split(,)(0).toInt } }/ and driver code, please tell me what I am doing wrong /val conf = new SparkConf().setAppName(MapInheritanceExample) conf.set(spark.shuffle.manager, SORT); val sc = new SparkContext(conf) val pF = sc.textFile(inputFile) val log = LogFactory.getLog(MapFunctionTest) val partitionedRDD = pF.map { x = var arr = x.split(\t); (arr(0)+,+arr(1), null) }.partitionBy(new StraightPartitioner(10)) var outputRDD = partitionedRDD.mapPartitions(p = { p.map({ case(o, n) = { o } }) })/ -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Sort based shuffle not working properly?
Just to add, I am suing Spark 1.1.0 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487p21488.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org