Re: Secondary Sort using Apache Spark 1.6

2017-03-29 Thread Pariksheet Barapatre
Many Thanks Yong.

Your solution rocks. If you could paste your answer on stack overflow then
I can mark it as correct answer.

Also, can you tell me how to achieve same using companion object.

Cheers
Pari

On 29 March 2017 at 21:37, Yong Zhang <java8...@hotmail.com> wrote:

> The error message indeed is not very clear.
>
>
> What you did wrong is that the repartitionAndSortWithinPartitions not
> only requires PairRDD, but also OrderedRDD. Your case class as key is NOT
> Ordered.
>
>
> Either you extends it from Ordered, or provide a companion object to do
> the implicit Ordering.
>
>
> scala> spark.versionres1: String = 2.1.0
>
> scala> case class DeviceKey(serialNum: String, eventDate: String, EventTs: 
> Long) extends Ordered[DeviceKey] { |   import 
> scala.math.Ordered.orderingToOrdered |   def compare(that: DeviceKey): 
> Int =
>  |  (this.serialNum, this.eventDate, this.EventTs * -1) compare
>  |  (that.serialNum, that.eventDate, that.EventTs * -1)
>  | }defined class DeviceKey
>
> scala>
>
> scala> val t = 
> sc.parallelize(List(((DeviceKey("2","100",1),1)),(DeviceKey("2","100",3),1)), 
> 1)t: org.apache.spark.rdd.RDD[(DeviceKey, Int)] = ParallelCollectionRDD[0] at 
> parallelize at :26
>
> scala>
>
> scala> class DeviceKeyPartitioner(partitions: Int) extends 
> org.apache.spark.Partitioner {
>  | require(partitions >= 0, s"Number of partitions ($partitions) 
> cannot be negative.")
>  |
>  | override def numPartitions: Int = partitions
>  |
>  | override def getPartition(key: Any): Int = {
>  |   val k = key.asInstanceOf[DeviceKey]
>  |   k.serialNum.hashCode() % numPartitions
>  | }
>  | }defined class DeviceKeyPartitioner
>
> scala>
>
> scala> t.repartitionAndSortWithinPartitions(new DeviceKeyPartitioner(2))res0: 
> org.apache.spark.rdd.RDD[(DeviceKey, Int)] = ShuffledRDD[1] at 
> repartitionAndSortWithinPartitions at :30
>
>
> Yong
>
>
> --
> *From:* Pariksheet Barapatre <pbarapa...@gmail.com>
> *Sent:* Wednesday, March 29, 2017 9:02 AM
> *To:* user
> *Subject:* Secondary Sort using Apache Spark 1.6
>
> Hi,
> <http://stackoverflow.com/questions/43038682/secondary-sort-using-apache-spark-1-6#>
>
> I am referring web link http://codingjunkie.net/spark-secondary-sort/ to
> implement secondary sort in my spark job.
>
> I have defined my key case class as
>
> case class DeviceKey(serialNum: String, eventDate: String, EventTs: Long) {
>   implicit def orderingBySerialNum[A <: DeviceKey] : Ordering[A] = {
>Ordering.by(fk => (fk.serialNum, fk.eventDate, fk.EventTs * -1))
> }
> }
>
> but when I try to apply function
> t.repartitionAndSortWithinPartitions(partitioner)
>
> #t is a RDD[(DeviceKey, Int)]
>
> I get error
> I am getting error as -
> value repartitionAndSortWithinPartitions is not a member of 
> org.apache.spark.rdd.RDD[(DeviceKey, Int)]
>
>
> Example code available at
> http://stackoverflow.com/questions/43038682/secondary-sort-using-apache-spark-1-6
>
> Could somebody help me to understand error.
>
> Many Thanks
>
> Pari
>
>
> --
> Cheers,
> Pari
>



-- 
Cheers,
Pari


Re: Secondary Sort using Apache Spark 1.6

2017-03-29 Thread Yong Zhang
The error message indeed is not very clear.


What you did wrong is that the repartitionAndSortWithinPartitions not only 
requires PairRDD, but also OrderedRDD. Your case class as key is NOT Ordered.


Either you extends it from Ordered, or provide a companion object to do the 
implicit Ordering.


scala> spark.version
res1: String = 2.1.0

scala> case class DeviceKey(serialNum: String, eventDate: String, EventTs: Long)
extends Ordered[DeviceKey] {
 |   import scala.math.Ordered.orderingToOrdered
 |   def compare(that: DeviceKey): Int =
 |  (this.serialNum, this.eventDate, this.EventTs * -1) compare
 |  (that.serialNum, that.eventDate, that.EventTs * -1)
 | }
defined class DeviceKey

scala>

scala> val t = sc.parallelize(List(((DeviceKey("2","100",1),1)),
(DeviceKey("2","100",3),1)), 1)
t: org.apache.spark.rdd.RDD[(DeviceKey, Int)] = ParallelCollectionRDD[0] at 
parallelize at :26

scala>

scala> class DeviceKeyPartitioner(partitions: Int) extends 
org.apache.spark.Partitioner {
 | require(partitions >= 0, s"Number of partitions ($partitions) cannot 
be negative.")
 |
 | override def numPartitions: Int = partitions
 |
 | override def getPartition(key: Any): Int = {
 |   val k = key.asInstanceOf[DeviceKey]
 |   k.serialNum.hashCode() % numPartitions
 | }
 | }
defined class DeviceKeyPartitioner

scala>

scala> t.repartitionAndSortWithinPartitions(new DeviceKeyPartitioner(2))
res0: org.apache.spark.rdd.RDD[(DeviceKey, Int)] = ShuffledRDD[1] at 
repartitionAndSortWithinPartitions at :30


Yong



From: Pariksheet Barapatre <pbarapa...@gmail.com>
Sent: Wednesday, March 29, 2017 9:02 AM
To: user
Subject: Secondary Sort using Apache Spark 1.6

Hi,<http://stackoverflow.com/questions/43038682/secondary-sort-using-apache-spark-1-6#>

I am referring web link http://codingjunkie.net/spark-secondary-sort/ to 
implement secondary sort in my spark job.

I have defined my key case class as

case class DeviceKey(serialNum: String, eventDate: String, EventTs: Long) {
  implicit def orderingBySerialNum[A <: DeviceKey] : Ordering[A] = {
   Ordering.by(fk => (fk.serialNum, fk.eventDate, fk.EventTs * -1))
}
}


but when I try to apply function
t.repartitionAndSortWithinPartitions(partitioner)


#t is a RDD[(DeviceKey, Int)]


I get error
I am getting error as -
value repartitionAndSortWithinPartitions is not a member of 
org.apache.spark.rdd.RDD[(DeviceKey, Int)]



Example code available at
http://stackoverflow.com/questions/43038682/secondary-sort-using-apache-spark-1-6


Could somebody help me to understand error.


Many Thanks

Pari

--
Cheers,
Pari


Secondary Sort using Apache Spark 1.6

2017-03-29 Thread Pariksheet Barapatre
 Hi,


I am referring web link http://codingjunkie.net/spark-secondary-sort/ to
implement secondary sort in my spark job.

I have defined my key case class as

case class DeviceKey(serialNum: String, eventDate: String, EventTs: Long) {
  implicit def orderingBySerialNum[A <: DeviceKey] : Ordering[A] = {
   Ordering.by(fk => (fk.serialNum, fk.eventDate, fk.EventTs * -1))
}
}

but when I try to apply function
t.repartitionAndSortWithinPartitions(partitioner)

#t is a RDD[(DeviceKey, Int)]

I get error
I am getting error as -
value repartitionAndSortWithinPartitions is not a member of
org.apache.spark.rdd.RDD[(DeviceKey, Int)]


Example code available at
http://stackoverflow.com/questions/43038682/secondary-sort-using-apache-spark-1-6

Could somebody help me to understand error.

Many Thanks

Pari


-- 
Cheers,
Pari