Re: Sort based shuffle not working properly?

2015-02-03 Thread nitinkak001
Just to add, I am suing Spark 1.1.0



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487p21488.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Sort based shuffle not working properly?

2015-02-03 Thread Mohammed Guller
Nitin,
Suing Spark is not going to help. Perhaps you should sue someone else :-) Just 
kidding!

Mohammed


-Original Message-
From: nitinkak001 [mailto:nitinkak...@gmail.com] 
Sent: Tuesday, February 3, 2015 1:57 PM
To: user@spark.apache.org
Subject: Re: Sort based shuffle not working properly?

Just to add, I am suing Spark 1.1.0



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487p21488.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Sort based shuffle not working properly?

2015-02-03 Thread Sean Owen
Hm, I don't think the sort partitioner is going to cause the result to
be ordered by c1,c2 if you only partitioned on c1. I mean, it's not
even guaranteed that the type of c2 has an ordering, right?

On Tue, Feb 3, 2015 at 3:38 PM, nitinkak001  wrote:
> I am trying to implement secondary sort in spark as we do in map-reduce.
>
> Here is my data(tab separated, without c1, c2, c2).
> c1c2 c3
> 1   2   4
> 1   3   6
> 2   4   7
> 2   6   8
> 3   5   5
> 3   1   8
> 3   2   0
>
> To do secondary sort, I create paried RDD as
>
> /((c1 + ","+ c2), row)/
>
> and then use a custom partitioner to partition only on c1. I have set
> /spark.shuffle.manager = SORT/ so the keys per partition are sorted. For the
> key "3" I am expecting to get
> (3, 1)
> (3, 2)
> (3, 5)
> but still getting the original order
> 3,5
> 3,1
> 3,2
>
> Here is the custom partitioner code:
>
> /class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner {
>   def numPartitions = p
>   def getPartition(key: Any) = {
> key.asInstanceOf[String].split(",")(0).toInt
>   }
>
> }/
>
> and driver code, please tell me what I am doing wrong
>
> /val conf = new SparkConf().setAppName("MapInheritanceExample")
> conf.set("spark.shuffle.manager", "SORT");
> val sc = new SparkContext(conf)
> val pF = sc.textFile(inputFile)
>
> val log = LogFactory.getLog("MapFunctionTest")
> val partitionedRDD = pF.map { x =>
>
> var arr = x.split("\t");
> (arr(0)+","+arr(1), null)
>
> }.partitionBy(new StraightPartitioner(10))
>
> var outputRDD = partitionedRDD.mapPartitions(p => {
>   p.map({ case(o, n) => {
>o
> }
>   })
> })/
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Sort based shuffle not working properly?

2015-02-03 Thread Nitin kak
I thought thats what sort based shuffled did, sort the keys going to the
same partition.

I have tried (c1, c2) as (Int, Int) tuple as well. I don't think that
ordering of c2 type is the problem here.

On Tue, Feb 3, 2015 at 5:21 PM, Sean Owen  wrote:

> Hm, I don't think the sort partitioner is going to cause the result to
> be ordered by c1,c2 if you only partitioned on c1. I mean, it's not
> even guaranteed that the type of c2 has an ordering, right?
>
> On Tue, Feb 3, 2015 at 3:38 PM, nitinkak001  wrote:
> > I am trying to implement secondary sort in spark as we do in map-reduce.
> >
> > Here is my data(tab separated, without c1, c2, c2).
> > c1c2 c3
> > 1   2   4
> > 1   3   6
> > 2   4   7
> > 2   6   8
> > 3   5   5
> > 3   1   8
> > 3   2   0
> >
> > To do secondary sort, I create paried RDD as
> >
> > /((c1 + ","+ c2), row)/
> >
> > and then use a custom partitioner to partition only on c1. I have set
> > /spark.shuffle.manager = SORT/ so the keys per partition are sorted. For
> the
> > key "3" I am expecting to get
> > (3, 1)
> > (3, 2)
> > (3, 5)
> > but still getting the original order
> > 3,5
> > 3,1
> > 3,2
> >
> > Here is the custom partitioner code:
> >
> > /class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner {
> >   def numPartitions = p
> >   def getPartition(key: Any) = {
> > key.asInstanceOf[String].split(",")(0).toInt
> >   }
> >
> > }/
> >
> > and driver code, please tell me what I am doing wrong
> >
> > /val conf = new SparkConf().setAppName("MapInheritanceExample")
> > conf.set("spark.shuffle.manager", "SORT");
> > val sc = new SparkContext(conf)
> > val pF = sc.textFile(inputFile)
> >
> > val log = LogFactory.getLog("MapFunctionTest")
> > val partitionedRDD = pF.map { x =>
> >
> > var arr = x.split("\t");
> > (arr(0)+","+arr(1), null)
> >
> > }.partitionBy(new StraightPartitioner(10))
> >
> > var outputRDD = partitionedRDD.mapPartitions(p => {
> >   p.map({ case(o, n) => {
> >o
> > }
> >   })
> > })/
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>


Re: Sort based shuffle not working properly?

2015-02-03 Thread Nitin kak
This is an exerpt from the Design document of the implementation of Sort
based shuffle.. I am thinking I might be wrong in my perception of sort
based shuffle. Dont  completely understand it though.

*Motivation*
A sort­based shuffle can be more scalable than Spark’s current hash­based
one because it doesn’t require writing a separate file for each reduce task
from each mapper. Instead, we write a single sorted file and serve ranges
of it to different reducers. In jobs with a lot of reduce tasks (say
10,000+), this saves significant memory for compression and serialization
buffers and results in more sequential disk I/O.

*Implementation*
To perform a sort­based shuffle, each map task will produce one or more
output files sorted by a key’s partition ID, then merge­sort them to yield
a single output file. Because it’s only necessary to group the keys
together into partitions, we won’t bother to also sort them within each
partition

On Tue, Feb 3, 2015 at 5:41 PM, Nitin kak  wrote:

> I thought thats what sort based shuffled did, sort the keys going to the
> same partition.
>
> I have tried (c1, c2) as (Int, Int) tuple as well. I don't think that
> ordering of c2 type is the problem here.
>
> On Tue, Feb 3, 2015 at 5:21 PM, Sean Owen  wrote:
>
>> Hm, I don't think the sort partitioner is going to cause the result to
>> be ordered by c1,c2 if you only partitioned on c1. I mean, it's not
>> even guaranteed that the type of c2 has an ordering, right?
>>
>> On Tue, Feb 3, 2015 at 3:38 PM, nitinkak001 
>> wrote:
>> > I am trying to implement secondary sort in spark as we do in map-reduce.
>> >
>> > Here is my data(tab separated, without c1, c2, c2).
>> > c1c2 c3
>> > 1   2   4
>> > 1   3   6
>> > 2   4   7
>> > 2   6   8
>> > 3   5   5
>> > 3   1   8
>> > 3   2   0
>> >
>> > To do secondary sort, I create paried RDD as
>> >
>> > /((c1 + ","+ c2), row)/
>> >
>> > and then use a custom partitioner to partition only on c1. I have set
>> > /spark.shuffle.manager = SORT/ so the keys per partition are sorted.
>> For the
>> > key "3" I am expecting to get
>> > (3, 1)
>> > (3, 2)
>> > (3, 5)
>> > but still getting the original order
>> > 3,5
>> > 3,1
>> > 3,2
>> >
>> > Here is the custom partitioner code:
>> >
>> > /class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner
>> {
>> >   def numPartitions = p
>> >   def getPartition(key: Any) = {
>> > key.asInstanceOf[String].split(",")(0).toInt
>> >   }
>> >
>> > }/
>> >
>> > and driver code, please tell me what I am doing wrong
>> >
>> > /val conf = new SparkConf().setAppName("MapInheritanceExample")
>> > conf.set("spark.shuffle.manager", "SORT");
>> > val sc = new SparkContext(conf)
>> > val pF = sc.textFile(inputFile)
>> >
>> > val log = LogFactory.getLog("MapFunctionTest")
>> > val partitionedRDD = pF.map { x =>
>> >
>> > var arr = x.split("\t");
>> > (arr(0)+","+arr(1), null)
>> >
>> > }.partitionBy(new StraightPartitioner(10))
>> >
>> > var outputRDD = partitionedRDD.mapPartitions(p => {
>> >   p.map({ case(o, n) => {
>> >o
>> > }
>> >   })
>> > })/
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > -
>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> > For additional commands, e-mail: user-h...@spark.apache.org
>> >
>>
>
>


Sort-basedshuffledesign.pdf
Description: Adobe PDF document

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: Sort based shuffle not working properly?

2015-02-04 Thread Imran Rashid
I think you are interested in secondary sort, which is still being worked
on:

https://issues.apache.org/jira/browse/SPARK-3655

On Tue, Feb 3, 2015 at 4:41 PM, Nitin kak  wrote:

> I thought thats what sort based shuffled did, sort the keys going to the
> same partition.
>
> I have tried (c1, c2) as (Int, Int) tuple as well. I don't think that
> ordering of c2 type is the problem here.
>
> On Tue, Feb 3, 2015 at 5:21 PM, Sean Owen  wrote:
>
>> Hm, I don't think the sort partitioner is going to cause the result to
>> be ordered by c1,c2 if you only partitioned on c1. I mean, it's not
>> even guaranteed that the type of c2 has an ordering, right?
>>
>> On Tue, Feb 3, 2015 at 3:38 PM, nitinkak001 
>> wrote:
>> > I am trying to implement secondary sort in spark as we do in map-reduce.
>> >
>> > Here is my data(tab separated, without c1, c2, c2).
>> > c1c2 c3
>> > 1   2   4
>> > 1   3   6
>> > 2   4   7
>> > 2   6   8
>> > 3   5   5
>> > 3   1   8
>> > 3   2   0
>> >
>> > To do secondary sort, I create paried RDD as
>> >
>> > /((c1 + ","+ c2), row)/
>> >
>> > and then use a custom partitioner to partition only on c1. I have set
>> > /spark.shuffle.manager = SORT/ so the keys per partition are sorted.
>> For the
>> > key "3" I am expecting to get
>> > (3, 1)
>> > (3, 2)
>> > (3, 5)
>> > but still getting the original order
>> > 3,5
>> > 3,1
>> > 3,2
>> >
>> > Here is the custom partitioner code:
>> >
>> > /class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner
>> {
>> >   def numPartitions = p
>> >   def getPartition(key: Any) = {
>> > key.asInstanceOf[String].split(",")(0).toInt
>> >   }
>> >
>> > }/
>> >
>> > and driver code, please tell me what I am doing wrong
>> >
>> > /val conf = new SparkConf().setAppName("MapInheritanceExample")
>> > conf.set("spark.shuffle.manager", "SORT");
>> > val sc = new SparkContext(conf)
>> > val pF = sc.textFile(inputFile)
>> >
>> > val log = LogFactory.getLog("MapFunctionTest")
>> > val partitionedRDD = pF.map { x =>
>> >
>> > var arr = x.split("\t");
>> > (arr(0)+","+arr(1), null)
>> >
>> > }.partitionBy(new StraightPartitioner(10))
>> >
>> > var outputRDD = partitionedRDD.mapPartitions(p => {
>> >   p.map({ case(o, n) => {
>> >o
>> > }
>> >   })
>> > })/
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > -
>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> > For additional commands, e-mail: user-h...@spark.apache.org
>> >
>>
>
>


Re: Sort based shuffle not working properly?

2015-02-04 Thread Nitin kak
There seems to be a way around it. I created a ShuffledRDD with the
partitioner in my code and using setKeyOrdering on it. It worked!!

*new ShuffledRDD[String, String, String](pairRDD, new
StraightPartitioner(10))*
*.setKeyOrdering(new scala.math.Ordering[String]{*
*  def compare(x : String, y : String) = x compare y*
*})*

On Wed, Feb 4, 2015 at 8:39 AM, Imran Rashid  wrote:

> I think you are interested in secondary sort, which is still being worked
> on:
>
> https://issues.apache.org/jira/browse/SPARK-3655
>
> On Tue, Feb 3, 2015 at 4:41 PM, Nitin kak  wrote:
>
>> I thought thats what sort based shuffled did, sort the keys going to the
>> same partition.
>>
>> I have tried (c1, c2) as (Int, Int) tuple as well. I don't think that
>> ordering of c2 type is the problem here.
>>
>> On Tue, Feb 3, 2015 at 5:21 PM, Sean Owen  wrote:
>>
>>> Hm, I don't think the sort partitioner is going to cause the result to
>>> be ordered by c1,c2 if you only partitioned on c1. I mean, it's not
>>> even guaranteed that the type of c2 has an ordering, right?
>>>
>>> On Tue, Feb 3, 2015 at 3:38 PM, nitinkak001 
>>> wrote:
>>> > I am trying to implement secondary sort in spark as we do in
>>> map-reduce.
>>> >
>>> > Here is my data(tab separated, without c1, c2, c2).
>>> > c1c2 c3
>>> > 1   2   4
>>> > 1   3   6
>>> > 2   4   7
>>> > 2   6   8
>>> > 3   5   5
>>> > 3   1   8
>>> > 3   2   0
>>> >
>>> > To do secondary sort, I create paried RDD as
>>> >
>>> > /((c1 + ","+ c2), row)/
>>> >
>>> > and then use a custom partitioner to partition only on c1. I have set
>>> > /spark.shuffle.manager = SORT/ so the keys per partition are sorted.
>>> For the
>>> > key "3" I am expecting to get
>>> > (3, 1)
>>> > (3, 2)
>>> > (3, 5)
>>> > but still getting the original order
>>> > 3,5
>>> > 3,1
>>> > 3,2
>>> >
>>> > Here is the custom partitioner code:
>>> >
>>> > /class StraightPartitioner(p: Int) extends
>>> org.apache.spark.Partitioner {
>>> >   def numPartitions = p
>>> >   def getPartition(key: Any) = {
>>> > key.asInstanceOf[String].split(",")(0).toInt
>>> >   }
>>> >
>>> > }/
>>> >
>>> > and driver code, please tell me what I am doing wrong
>>> >
>>> > /val conf = new SparkConf().setAppName("MapInheritanceExample")
>>> > conf.set("spark.shuffle.manager", "SORT");
>>> > val sc = new SparkContext(conf)
>>> > val pF = sc.textFile(inputFile)
>>> >
>>> > val log = LogFactory.getLog("MapFunctionTest")
>>> > val partitionedRDD = pF.map { x =>
>>> >
>>> > var arr = x.split("\t");
>>> > (arr(0)+","+arr(1), null)
>>> >
>>> > }.partitionBy(new StraightPartitioner(10))
>>> >
>>> > var outputRDD = partitionedRDD.mapPartitions(p => {
>>> >   p.map({ case(o, n) => {
>>> >o
>>> > }
>>> >   })
>>> > })/
>>> >
>>> >
>>> >
>>> > --
>>> > View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487.html
>>> > Sent from the Apache Spark User List mailing list archive at
>>> Nabble.com.
>>> >
>>> > -
>>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> > For additional commands, e-mail: user-h...@spark.apache.org
>>> >
>>>
>>
>>
>