Re: Sort based shuffle not working properly?
Just to add, I am suing Spark 1.1.0 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487p21488.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Sort based shuffle not working properly?
Nitin, Suing Spark is not going to help. Perhaps you should sue someone else :-) Just kidding! Mohammed -Original Message- From: nitinkak001 [mailto:nitinkak...@gmail.com] Sent: Tuesday, February 3, 2015 1:57 PM To: user@spark.apache.org Subject: Re: Sort based shuffle not working properly? Just to add, I am suing Spark 1.1.0 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487p21488.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Sort based shuffle not working properly?
Hm, I don't think the sort partitioner is going to cause the result to be ordered by c1,c2 if you only partitioned on c1. I mean, it's not even guaranteed that the type of c2 has an ordering, right? On Tue, Feb 3, 2015 at 3:38 PM, nitinkak001 wrote: > I am trying to implement secondary sort in spark as we do in map-reduce. > > Here is my data(tab separated, without c1, c2, c2). > c1c2 c3 > 1 2 4 > 1 3 6 > 2 4 7 > 2 6 8 > 3 5 5 > 3 1 8 > 3 2 0 > > To do secondary sort, I create paried RDD as > > /((c1 + ","+ c2), row)/ > > and then use a custom partitioner to partition only on c1. I have set > /spark.shuffle.manager = SORT/ so the keys per partition are sorted. For the > key "3" I am expecting to get > (3, 1) > (3, 2) > (3, 5) > but still getting the original order > 3,5 > 3,1 > 3,2 > > Here is the custom partitioner code: > > /class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner { > def numPartitions = p > def getPartition(key: Any) = { > key.asInstanceOf[String].split(",")(0).toInt > } > > }/ > > and driver code, please tell me what I am doing wrong > > /val conf = new SparkConf().setAppName("MapInheritanceExample") > conf.set("spark.shuffle.manager", "SORT"); > val sc = new SparkContext(conf) > val pF = sc.textFile(inputFile) > > val log = LogFactory.getLog("MapFunctionTest") > val partitionedRDD = pF.map { x => > > var arr = x.split("\t"); > (arr(0)+","+arr(1), null) > > }.partitionBy(new StraightPartitioner(10)) > > var outputRDD = partitionedRDD.mapPartitions(p => { > p.map({ case(o, n) => { >o > } > }) > })/ > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Sort based shuffle not working properly?
I thought thats what sort based shuffled did, sort the keys going to the same partition. I have tried (c1, c2) as (Int, Int) tuple as well. I don't think that ordering of c2 type is the problem here. On Tue, Feb 3, 2015 at 5:21 PM, Sean Owen wrote: > Hm, I don't think the sort partitioner is going to cause the result to > be ordered by c1,c2 if you only partitioned on c1. I mean, it's not > even guaranteed that the type of c2 has an ordering, right? > > On Tue, Feb 3, 2015 at 3:38 PM, nitinkak001 wrote: > > I am trying to implement secondary sort in spark as we do in map-reduce. > > > > Here is my data(tab separated, without c1, c2, c2). > > c1c2 c3 > > 1 2 4 > > 1 3 6 > > 2 4 7 > > 2 6 8 > > 3 5 5 > > 3 1 8 > > 3 2 0 > > > > To do secondary sort, I create paried RDD as > > > > /((c1 + ","+ c2), row)/ > > > > and then use a custom partitioner to partition only on c1. I have set > > /spark.shuffle.manager = SORT/ so the keys per partition are sorted. For > the > > key "3" I am expecting to get > > (3, 1) > > (3, 2) > > (3, 5) > > but still getting the original order > > 3,5 > > 3,1 > > 3,2 > > > > Here is the custom partitioner code: > > > > /class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner { > > def numPartitions = p > > def getPartition(key: Any) = { > > key.asInstanceOf[String].split(",")(0).toInt > > } > > > > }/ > > > > and driver code, please tell me what I am doing wrong > > > > /val conf = new SparkConf().setAppName("MapInheritanceExample") > > conf.set("spark.shuffle.manager", "SORT"); > > val sc = new SparkContext(conf) > > val pF = sc.textFile(inputFile) > > > > val log = LogFactory.getLog("MapFunctionTest") > > val partitionedRDD = pF.map { x => > > > > var arr = x.split("\t"); > > (arr(0)+","+arr(1), null) > > > > }.partitionBy(new StraightPartitioner(10)) > > > > var outputRDD = partitionedRDD.mapPartitions(p => { > > p.map({ case(o, n) => { > >o > > } > > }) > > })/ > > > > > > > > -- > > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487.html > > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > > > - > > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > > For additional commands, e-mail: user-h...@spark.apache.org > > >
Re: Sort based shuffle not working properly?
This is an exerpt from the Design document of the implementation of Sort based shuffle.. I am thinking I might be wrong in my perception of sort based shuffle. Dont completely understand it though. *Motivation* A sortbased shuffle can be more scalable than Spark’s current hashbased one because it doesn’t require writing a separate file for each reduce task from each mapper. Instead, we write a single sorted file and serve ranges of it to different reducers. In jobs with a lot of reduce tasks (say 10,000+), this saves significant memory for compression and serialization buffers and results in more sequential disk I/O. *Implementation* To perform a sortbased shuffle, each map task will produce one or more output files sorted by a key’s partition ID, then mergesort them to yield a single output file. Because it’s only necessary to group the keys together into partitions, we won’t bother to also sort them within each partition On Tue, Feb 3, 2015 at 5:41 PM, Nitin kak wrote: > I thought thats what sort based shuffled did, sort the keys going to the > same partition. > > I have tried (c1, c2) as (Int, Int) tuple as well. I don't think that > ordering of c2 type is the problem here. > > On Tue, Feb 3, 2015 at 5:21 PM, Sean Owen wrote: > >> Hm, I don't think the sort partitioner is going to cause the result to >> be ordered by c1,c2 if you only partitioned on c1. I mean, it's not >> even guaranteed that the type of c2 has an ordering, right? >> >> On Tue, Feb 3, 2015 at 3:38 PM, nitinkak001 >> wrote: >> > I am trying to implement secondary sort in spark as we do in map-reduce. >> > >> > Here is my data(tab separated, without c1, c2, c2). >> > c1c2 c3 >> > 1 2 4 >> > 1 3 6 >> > 2 4 7 >> > 2 6 8 >> > 3 5 5 >> > 3 1 8 >> > 3 2 0 >> > >> > To do secondary sort, I create paried RDD as >> > >> > /((c1 + ","+ c2), row)/ >> > >> > and then use a custom partitioner to partition only on c1. I have set >> > /spark.shuffle.manager = SORT/ so the keys per partition are sorted. >> For the >> > key "3" I am expecting to get >> > (3, 1) >> > (3, 2) >> > (3, 5) >> > but still getting the original order >> > 3,5 >> > 3,1 >> > 3,2 >> > >> > Here is the custom partitioner code: >> > >> > /class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner >> { >> > def numPartitions = p >> > def getPartition(key: Any) = { >> > key.asInstanceOf[String].split(",")(0).toInt >> > } >> > >> > }/ >> > >> > and driver code, please tell me what I am doing wrong >> > >> > /val conf = new SparkConf().setAppName("MapInheritanceExample") >> > conf.set("spark.shuffle.manager", "SORT"); >> > val sc = new SparkContext(conf) >> > val pF = sc.textFile(inputFile) >> > >> > val log = LogFactory.getLog("MapFunctionTest") >> > val partitionedRDD = pF.map { x => >> > >> > var arr = x.split("\t"); >> > (arr(0)+","+arr(1), null) >> > >> > }.partitionBy(new StraightPartitioner(10)) >> > >> > var outputRDD = partitionedRDD.mapPartitions(p => { >> > p.map({ case(o, n) => { >> >o >> > } >> > }) >> > })/ >> > >> > >> > >> > -- >> > View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487.html >> > Sent from the Apache Spark User List mailing list archive at Nabble.com. >> > >> > - >> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> > For additional commands, e-mail: user-h...@spark.apache.org >> > >> > > Sort-basedshuffledesign.pdf Description: Adobe PDF document - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Sort based shuffle not working properly?
I think you are interested in secondary sort, which is still being worked on: https://issues.apache.org/jira/browse/SPARK-3655 On Tue, Feb 3, 2015 at 4:41 PM, Nitin kak wrote: > I thought thats what sort based shuffled did, sort the keys going to the > same partition. > > I have tried (c1, c2) as (Int, Int) tuple as well. I don't think that > ordering of c2 type is the problem here. > > On Tue, Feb 3, 2015 at 5:21 PM, Sean Owen wrote: > >> Hm, I don't think the sort partitioner is going to cause the result to >> be ordered by c1,c2 if you only partitioned on c1. I mean, it's not >> even guaranteed that the type of c2 has an ordering, right? >> >> On Tue, Feb 3, 2015 at 3:38 PM, nitinkak001 >> wrote: >> > I am trying to implement secondary sort in spark as we do in map-reduce. >> > >> > Here is my data(tab separated, without c1, c2, c2). >> > c1c2 c3 >> > 1 2 4 >> > 1 3 6 >> > 2 4 7 >> > 2 6 8 >> > 3 5 5 >> > 3 1 8 >> > 3 2 0 >> > >> > To do secondary sort, I create paried RDD as >> > >> > /((c1 + ","+ c2), row)/ >> > >> > and then use a custom partitioner to partition only on c1. I have set >> > /spark.shuffle.manager = SORT/ so the keys per partition are sorted. >> For the >> > key "3" I am expecting to get >> > (3, 1) >> > (3, 2) >> > (3, 5) >> > but still getting the original order >> > 3,5 >> > 3,1 >> > 3,2 >> > >> > Here is the custom partitioner code: >> > >> > /class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner >> { >> > def numPartitions = p >> > def getPartition(key: Any) = { >> > key.asInstanceOf[String].split(",")(0).toInt >> > } >> > >> > }/ >> > >> > and driver code, please tell me what I am doing wrong >> > >> > /val conf = new SparkConf().setAppName("MapInheritanceExample") >> > conf.set("spark.shuffle.manager", "SORT"); >> > val sc = new SparkContext(conf) >> > val pF = sc.textFile(inputFile) >> > >> > val log = LogFactory.getLog("MapFunctionTest") >> > val partitionedRDD = pF.map { x => >> > >> > var arr = x.split("\t"); >> > (arr(0)+","+arr(1), null) >> > >> > }.partitionBy(new StraightPartitioner(10)) >> > >> > var outputRDD = partitionedRDD.mapPartitions(p => { >> > p.map({ case(o, n) => { >> >o >> > } >> > }) >> > })/ >> > >> > >> > >> > -- >> > View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487.html >> > Sent from the Apache Spark User List mailing list archive at Nabble.com. >> > >> > - >> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> > For additional commands, e-mail: user-h...@spark.apache.org >> > >> > >
Re: Sort based shuffle not working properly?
There seems to be a way around it. I created a ShuffledRDD with the partitioner in my code and using setKeyOrdering on it. It worked!! *new ShuffledRDD[String, String, String](pairRDD, new StraightPartitioner(10))* *.setKeyOrdering(new scala.math.Ordering[String]{* * def compare(x : String, y : String) = x compare y* *})* On Wed, Feb 4, 2015 at 8:39 AM, Imran Rashid wrote: > I think you are interested in secondary sort, which is still being worked > on: > > https://issues.apache.org/jira/browse/SPARK-3655 > > On Tue, Feb 3, 2015 at 4:41 PM, Nitin kak wrote: > >> I thought thats what sort based shuffled did, sort the keys going to the >> same partition. >> >> I have tried (c1, c2) as (Int, Int) tuple as well. I don't think that >> ordering of c2 type is the problem here. >> >> On Tue, Feb 3, 2015 at 5:21 PM, Sean Owen wrote: >> >>> Hm, I don't think the sort partitioner is going to cause the result to >>> be ordered by c1,c2 if you only partitioned on c1. I mean, it's not >>> even guaranteed that the type of c2 has an ordering, right? >>> >>> On Tue, Feb 3, 2015 at 3:38 PM, nitinkak001 >>> wrote: >>> > I am trying to implement secondary sort in spark as we do in >>> map-reduce. >>> > >>> > Here is my data(tab separated, without c1, c2, c2). >>> > c1c2 c3 >>> > 1 2 4 >>> > 1 3 6 >>> > 2 4 7 >>> > 2 6 8 >>> > 3 5 5 >>> > 3 1 8 >>> > 3 2 0 >>> > >>> > To do secondary sort, I create paried RDD as >>> > >>> > /((c1 + ","+ c2), row)/ >>> > >>> > and then use a custom partitioner to partition only on c1. I have set >>> > /spark.shuffle.manager = SORT/ so the keys per partition are sorted. >>> For the >>> > key "3" I am expecting to get >>> > (3, 1) >>> > (3, 2) >>> > (3, 5) >>> > but still getting the original order >>> > 3,5 >>> > 3,1 >>> > 3,2 >>> > >>> > Here is the custom partitioner code: >>> > >>> > /class StraightPartitioner(p: Int) extends >>> org.apache.spark.Partitioner { >>> > def numPartitions = p >>> > def getPartition(key: Any) = { >>> > key.asInstanceOf[String].split(",")(0).toInt >>> > } >>> > >>> > }/ >>> > >>> > and driver code, please tell me what I am doing wrong >>> > >>> > /val conf = new SparkConf().setAppName("MapInheritanceExample") >>> > conf.set("spark.shuffle.manager", "SORT"); >>> > val sc = new SparkContext(conf) >>> > val pF = sc.textFile(inputFile) >>> > >>> > val log = LogFactory.getLog("MapFunctionTest") >>> > val partitionedRDD = pF.map { x => >>> > >>> > var arr = x.split("\t"); >>> > (arr(0)+","+arr(1), null) >>> > >>> > }.partitionBy(new StraightPartitioner(10)) >>> > >>> > var outputRDD = partitionedRDD.mapPartitions(p => { >>> > p.map({ case(o, n) => { >>> >o >>> > } >>> > }) >>> > })/ >>> > >>> > >>> > >>> > -- >>> > View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487.html >>> > Sent from the Apache Spark User List mailing list archive at >>> Nabble.com. >>> > >>> > - >>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> > For additional commands, e-mail: user-h...@spark.apache.org >>> > >>> >> >> >