Re: Sort based shuffle not working properly?

2015-02-04 Thread Imran Rashid
I think you are interested in secondary sort, which is still being worked
on:

https://issues.apache.org/jira/browse/SPARK-3655

On Tue, Feb 3, 2015 at 4:41 PM, Nitin kak nitinkak...@gmail.com wrote:

 I thought thats what sort based shuffled did, sort the keys going to the
 same partition.

 I have tried (c1, c2) as (Int, Int) tuple as well. I don't think that
 ordering of c2 type is the problem here.

 On Tue, Feb 3, 2015 at 5:21 PM, Sean Owen so...@cloudera.com wrote:

 Hm, I don't think the sort partitioner is going to cause the result to
 be ordered by c1,c2 if you only partitioned on c1. I mean, it's not
 even guaranteed that the type of c2 has an ordering, right?

 On Tue, Feb 3, 2015 at 3:38 PM, nitinkak001 nitinkak...@gmail.com
 wrote:
  I am trying to implement secondary sort in spark as we do in map-reduce.
 
  Here is my data(tab separated, without c1, c2, c2).
  c1c2 c3
  1   2   4
  1   3   6
  2   4   7
  2   6   8
  3   5   5
  3   1   8
  3   2   0
 
  To do secondary sort, I create paried RDD as
 
  /((c1 + ,+ c2), row)/
 
  and then use a custom partitioner to partition only on c1. I have set
  /spark.shuffle.manager = SORT/ so the keys per partition are sorted.
 For the
  key 3 I am expecting to get
  (3, 1)
  (3, 2)
  (3, 5)
  but still getting the original order
  3,5
  3,1
  3,2
 
  Here is the custom partitioner code:
 
  /class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner
 {
def numPartitions = p
def getPartition(key: Any) = {
  key.asInstanceOf[String].split(,)(0).toInt
}
 
  }/
 
  and driver code, please tell me what I am doing wrong
 
  /val conf = new SparkConf().setAppName(MapInheritanceExample)
  conf.set(spark.shuffle.manager, SORT);
  val sc = new SparkContext(conf)
  val pF = sc.textFile(inputFile)
 
  val log = LogFactory.getLog(MapFunctionTest)
  val partitionedRDD = pF.map { x =
 
  var arr = x.split(\t);
  (arr(0)+,+arr(1), null)
 
  }.partitionBy(new StraightPartitioner(10))
 
  var outputRDD = partitionedRDD.mapPartitions(p = {
p.map({ case(o, n) = {
 o
  }
})
  })/
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 





RE: Sort based shuffle not working properly?

2015-02-03 Thread Mohammed Guller
Nitin,
Suing Spark is not going to help. Perhaps you should sue someone else :-) Just 
kidding!

Mohammed


-Original Message-
From: nitinkak001 [mailto:nitinkak...@gmail.com] 
Sent: Tuesday, February 3, 2015 1:57 PM
To: user@spark.apache.org
Subject: Re: Sort based shuffle not working properly?

Just to add, I am suing Spark 1.1.0



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487p21488.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Sort based shuffle not working properly?

2015-02-03 Thread Sean Owen
Hm, I don't think the sort partitioner is going to cause the result to
be ordered by c1,c2 if you only partitioned on c1. I mean, it's not
even guaranteed that the type of c2 has an ordering, right?

On Tue, Feb 3, 2015 at 3:38 PM, nitinkak001 nitinkak...@gmail.com wrote:
 I am trying to implement secondary sort in spark as we do in map-reduce.

 Here is my data(tab separated, without c1, c2, c2).
 c1c2 c3
 1   2   4
 1   3   6
 2   4   7
 2   6   8
 3   5   5
 3   1   8
 3   2   0

 To do secondary sort, I create paried RDD as

 /((c1 + ,+ c2), row)/

 and then use a custom partitioner to partition only on c1. I have set
 /spark.shuffle.manager = SORT/ so the keys per partition are sorted. For the
 key 3 I am expecting to get
 (3, 1)
 (3, 2)
 (3, 5)
 but still getting the original order
 3,5
 3,1
 3,2

 Here is the custom partitioner code:

 /class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner {
   def numPartitions = p
   def getPartition(key: Any) = {
 key.asInstanceOf[String].split(,)(0).toInt
   }

 }/

 and driver code, please tell me what I am doing wrong

 /val conf = new SparkConf().setAppName(MapInheritanceExample)
 conf.set(spark.shuffle.manager, SORT);
 val sc = new SparkContext(conf)
 val pF = sc.textFile(inputFile)

 val log = LogFactory.getLog(MapFunctionTest)
 val partitionedRDD = pF.map { x =

 var arr = x.split(\t);
 (arr(0)+,+arr(1), null)

 }.partitionBy(new StraightPartitioner(10))

 var outputRDD = partitionedRDD.mapPartitions(p = {
   p.map({ case(o, n) = {
o
 }
   })
 })/



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Sort based shuffle not working properly?

2015-02-03 Thread Nitin kak
This is an exerpt from the Design document of the implementation of Sort
based shuffle.. I am thinking I might be wrong in my perception of sort
based shuffle. Dont  completely understand it though.

*Motivation*
A sort­based shuffle can be more scalable than Spark’s current hash­based
one because it doesn’t require writing a separate file for each reduce task
from each mapper. Instead, we write a single sorted file and serve ranges
of it to different reducers. In jobs with a lot of reduce tasks (say
10,000+), this saves significant memory for compression and serialization
buffers and results in more sequential disk I/O.

*Implementation*
To perform a sort­based shuffle, each map task will produce one or more
output files sorted by a key’s partition ID, then merge­sort them to yield
a single output file. Because it’s only necessary to group the keys
together into partitions, we won’t bother to also sort them within each
partition

On Tue, Feb 3, 2015 at 5:41 PM, Nitin kak nitinkak...@gmail.com wrote:

 I thought thats what sort based shuffled did, sort the keys going to the
 same partition.

 I have tried (c1, c2) as (Int, Int) tuple as well. I don't think that
 ordering of c2 type is the problem here.

 On Tue, Feb 3, 2015 at 5:21 PM, Sean Owen so...@cloudera.com wrote:

 Hm, I don't think the sort partitioner is going to cause the result to
 be ordered by c1,c2 if you only partitioned on c1. I mean, it's not
 even guaranteed that the type of c2 has an ordering, right?

 On Tue, Feb 3, 2015 at 3:38 PM, nitinkak001 nitinkak...@gmail.com
 wrote:
  I am trying to implement secondary sort in spark as we do in map-reduce.
 
  Here is my data(tab separated, without c1, c2, c2).
  c1c2 c3
  1   2   4
  1   3   6
  2   4   7
  2   6   8
  3   5   5
  3   1   8
  3   2   0
 
  To do secondary sort, I create paried RDD as
 
  /((c1 + ,+ c2), row)/
 
  and then use a custom partitioner to partition only on c1. I have set
  /spark.shuffle.manager = SORT/ so the keys per partition are sorted.
 For the
  key 3 I am expecting to get
  (3, 1)
  (3, 2)
  (3, 5)
  but still getting the original order
  3,5
  3,1
  3,2
 
  Here is the custom partitioner code:
 
  /class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner
 {
def numPartitions = p
def getPartition(key: Any) = {
  key.asInstanceOf[String].split(,)(0).toInt
}
 
  }/
 
  and driver code, please tell me what I am doing wrong
 
  /val conf = new SparkConf().setAppName(MapInheritanceExample)
  conf.set(spark.shuffle.manager, SORT);
  val sc = new SparkContext(conf)
  val pF = sc.textFile(inputFile)
 
  val log = LogFactory.getLog(MapFunctionTest)
  val partitionedRDD = pF.map { x =
 
  var arr = x.split(\t);
  (arr(0)+,+arr(1), null)
 
  }.partitionBy(new StraightPartitioner(10))
 
  var outputRDD = partitionedRDD.mapPartitions(p = {
p.map({ case(o, n) = {
 o
  }
})
  })/
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 





Sort-basedshuffledesign.pdf
Description: Adobe PDF document

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: Sort based shuffle not working properly?

2015-02-03 Thread Nitin kak
I thought thats what sort based shuffled did, sort the keys going to the
same partition.

I have tried (c1, c2) as (Int, Int) tuple as well. I don't think that
ordering of c2 type is the problem here.

On Tue, Feb 3, 2015 at 5:21 PM, Sean Owen so...@cloudera.com wrote:

 Hm, I don't think the sort partitioner is going to cause the result to
 be ordered by c1,c2 if you only partitioned on c1. I mean, it's not
 even guaranteed that the type of c2 has an ordering, right?

 On Tue, Feb 3, 2015 at 3:38 PM, nitinkak001 nitinkak...@gmail.com wrote:
  I am trying to implement secondary sort in spark as we do in map-reduce.
 
  Here is my data(tab separated, without c1, c2, c2).
  c1c2 c3
  1   2   4
  1   3   6
  2   4   7
  2   6   8
  3   5   5
  3   1   8
  3   2   0
 
  To do secondary sort, I create paried RDD as
 
  /((c1 + ,+ c2), row)/
 
  and then use a custom partitioner to partition only on c1. I have set
  /spark.shuffle.manager = SORT/ so the keys per partition are sorted. For
 the
  key 3 I am expecting to get
  (3, 1)
  (3, 2)
  (3, 5)
  but still getting the original order
  3,5
  3,1
  3,2
 
  Here is the custom partitioner code:
 
  /class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner {
def numPartitions = p
def getPartition(key: Any) = {
  key.asInstanceOf[String].split(,)(0).toInt
}
 
  }/
 
  and driver code, please tell me what I am doing wrong
 
  /val conf = new SparkConf().setAppName(MapInheritanceExample)
  conf.set(spark.shuffle.manager, SORT);
  val sc = new SparkContext(conf)
  val pF = sc.textFile(inputFile)
 
  val log = LogFactory.getLog(MapFunctionTest)
  val partitionedRDD = pF.map { x =
 
  var arr = x.split(\t);
  (arr(0)+,+arr(1), null)
 
  }.partitionBy(new StraightPartitioner(10))
 
  var outputRDD = partitionedRDD.mapPartitions(p = {
p.map({ case(o, n) = {
 o
  }
})
  })/
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 



Sort based shuffle not working properly?

2015-02-03 Thread nitinkak001
I am trying to implement secondary sort in spark as we do in map-reduce. 

Here is my data(tab separated, without c1, c2, c2). 
c1c2 c3
1   2   4
1   3   6
2   4   7
2   6   8
3   5   5
3   1   8
3   2   0

To do secondary sort, I create paried RDD as 

/((c1 + ,+ c2), row)/

and then use a custom partitioner to partition only on c1. I have set
/spark.shuffle.manager = SORT/ so the keys per partition are sorted. For the
key 3 I am expecting to get 
(3, 1)
(3, 2)
(3, 5)
but still getting the original order
3,5
3,1
3,2

Here is the custom partitioner code:

/class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner {
  def numPartitions = p
  def getPartition(key: Any) = {
key.asInstanceOf[String].split(,)(0).toInt
  }

}/

and driver code, please tell me what I am doing wrong

/val conf = new SparkConf().setAppName(MapInheritanceExample)
conf.set(spark.shuffle.manager, SORT);
val sc = new SparkContext(conf)
val pF = sc.textFile(inputFile)

val log = LogFactory.getLog(MapFunctionTest)
val partitionedRDD = pF.map { x =

var arr = x.split(\t);
(arr(0)+,+arr(1), null)

}.partitionBy(new StraightPartitioner(10))

var outputRDD = partitionedRDD.mapPartitions(p = {
  p.map({ case(o, n) = {
   o
}
  })
})/



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Sort based shuffle not working properly?

2015-02-03 Thread nitinkak001
Just to add, I am suing Spark 1.1.0



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487p21488.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org