Re: Equally weighted partitions in Spark

2014-05-15 Thread deenar.toraskar
This is my first implementation. There are a few rough edges, but when I run
this I get the following exception. The class extends Partitioner which in
turn extends Serializable. Any idea what I am doing wrong?

scala> res156.partitionBy(new EqualWeightPartitioner(1000, res156,
weightFunction))
14/05/09 16:59:36 INFO SparkContext: Starting job: histogram at
:197
14/05/09 16:59:36 INFO DAGScheduler: Got job 18 (histogram at :197)
with 250 output partitions (allowLocal=false)
14/05/09 16:59:36 INFO DAGScheduler: Final stage: Stage 18 (histogram at
:197)
14/05/09 16:59:36 INFO DAGScheduler: Parents of final stage: List()
14/05/09 16:59:36 INFO DAGScheduler: Missing parents: List()
14/05/09 16:59:36 INFO DAGScheduler: Submitting Stage 18
(MapPartitionsRDD[36] at histogram at :197), which has no missing
parents
14/05/09 16:59:36 INFO DAGScheduler: Failed to run histogram at
:197
org.apache.spark.SparkException: Job aborted: Task not serializable:
java.io.NotSerializableException: scala.util.Random
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)
at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
==


import scala.reflect.ClassTag

import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils
import org.apache.spark.Partitioner 
import java.util
import scala.Array
import scala.reflect._
import scala.util.Random

/**
 * A [[org.apache.spark.Partitioner]] that partitions sortable records by
range into roughly
 * equal ranges. The ranges are determined by sampling the content of the
RDD passed in.
 */
class EqualWeightPartitioner[K : Ordering : ClassTag, V](
partitions: Int,
@transient rdd: RDD[_ <: Product2[K,V]],
weightFunction:  K => Double)  extends Partitioner {

  private val ordering = implicitly[Ordering[K]]
  private val histogram = rdd.map(x=> (x._1, weightFunction(x._1))).map(x=>
x._2.toDouble).histogram(partitions)
  private val bucketSize = (histogram._1(1) - histogram._1(0))

  // need to refine this algorithm to use single partitions

  // An array of upper bounds for the first (partitions - 1) partitions
  private val bucketToPartitionMap: Array[(Int, Int)] = {
if (partitions == 1) {
  Array()
} else {
  val bucketWeights = histogram._2.zipWithIndex.map{case(y,idx)=>
(y*(histogram._1(idx) + histogram._1(idx+1))/2)}.map{var s :Double= 0.0; d
=> {s += d; s}}
  val averageWeight = bucketWeights.last/partitions
  val bucketPartition : Array[(Int, Int)] =
bucketWeights.map(x=>Math.max(0,Math.round(x/averageWeight)-1).toInt).zipWithIndex
  bucketPartition.map(x=>x._2 match { case 0 => (0, x._1) case _ =>
(Math.min(partitions-1,bucketPartition(x._2-1)._1+1), x._1)})
}
  }

  def numPartitions = partitions

  def getPartition(key: Any): Int = {
val rnd = new Random
val k = key.asInstanceOf[K]
val weight : Double = weightFunction(k)
val bucketIndex : Int = Math.min(1,(weight/bucketSize).toInt)
val partitionRange : (Int, Int) = bucketToPartitionMap(bucketIndex - 1)
partitionRange._1 + rnd.nextInt((partitionRange._2 - partitionRange._1 +
1 ))
  }

  override def equals(other: Any): Boolean = other match {
case r: EqualWeightPartitioner[_,_] =>
  r.bucketToPartitionMap.sameElements(bucketToPartitionMap)
case _ =>
  false
  }

}



--
View this message

Re: Equally weighted partitions in Spark

2014-05-15 Thread Syed A. Hashmi
I took a stab at it and wrote a
partitionerthat
I intend to contribute back to main repo some time later. The
partitioner takes in parameter which governs minimum number of keys /
partition and once all partition hits that limit, it goes round robin. An
alternate strategy could be to go round robin by default. This partitioner
will guarantee equally sized partitions without tinkering with hash codes,
complex balancing computations, etc.

Wanted to get your thoughts on this and any critical comments suggesting
improvements.

Thanks,
Syed.


On Sat, May 3, 2014 at 6:12 PM, Chris Fregly  wrote:

> @deenar-  i like the custom partitioner strategy that you mentioned.  i
> think it's very useful.
>
> as a thought-exercise, is it possible to re-partition your RDD to
> more-evenly distribute the long-running tasks among the short-running tasks
> by ordering the key's differently?  this would play nice with the existing
> RangePartitioner.
>
> or perhaps manipulate the key's hashCode() to more-evenly-distribute the
> tasks to play nicely with the existing HashPartitioner?
>
> i don't know if either of these are beneficial, but throwing them out for
> the sake of conversation...
>
> -chris
>
>
> On Fri, May 2, 2014 at 11:10 AM, Andrew Ash  wrote:
>
>> Deenar,
>>
>> I haven't heard of any activity to do partitioning in that way, but it
>> does seem more broadly valuable.
>>
>>
>> On Fri, May 2, 2014 at 10:15 AM, deenar.toraskar 
>> wrote:
>>
>>> I have equal sized partitions now, but I want the RDD to be partitioned
>>> such
>>> that the partitions are equally weighted by some attribute of each RDD
>>> element (e.g. size or complexity).
>>>
>>> I have been looking at the RangePartitioner code and I have come up with
>>> something like
>>>
>>> EquallyWeightedPartitioner(noOfPartitions, weightFunction)
>>>
>>> 1) take a sum or (sample) of complexities of all elements and calculate
>>> average weight per partition
>>> 2) take a histogram of weights
>>> 3) assign a list of partitions to each bucket
>>> 4)  getPartition(key: Any): Int would
>>>   a) get the weight and then find the bucket
>>>   b) assign a random partition from the list of partitions associated
>>> with
>>> each bucket
>>>
>>> Just wanted to know if someone else had come across this issue before and
>>> there was a better way of doing this.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Equally-weighted-partitions-in-Spark-tp5171p5212.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>
>>
>


Re: Equally weighted partitions in Spark

2014-05-03 Thread Chris Fregly
@deenar-  i like the custom partitioner strategy that you mentioned.  i
think it's very useful.

as a thought-exercise, is it possible to re-partition your RDD to
more-evenly distribute the long-running tasks among the short-running tasks
by ordering the key's differently?  this would play nice with the existing
RangePartitioner.

or perhaps manipulate the key's hashCode() to more-evenly-distribute the
tasks to play nicely with the existing HashPartitioner?

i don't know if either of these are beneficial, but throwing them out for
the sake of conversation...

-chris


On Fri, May 2, 2014 at 11:10 AM, Andrew Ash  wrote:

> Deenar,
>
> I haven't heard of any activity to do partitioning in that way, but it
> does seem more broadly valuable.
>
>
> On Fri, May 2, 2014 at 10:15 AM, deenar.toraskar 
> wrote:
>
>> I have equal sized partitions now, but I want the RDD to be partitioned
>> such
>> that the partitions are equally weighted by some attribute of each RDD
>> element (e.g. size or complexity).
>>
>> I have been looking at the RangePartitioner code and I have come up with
>> something like
>>
>> EquallyWeightedPartitioner(noOfPartitions, weightFunction)
>>
>> 1) take a sum or (sample) of complexities of all elements and calculate
>> average weight per partition
>> 2) take a histogram of weights
>> 3) assign a list of partitions to each bucket
>> 4)  getPartition(key: Any): Int would
>>   a) get the weight and then find the bucket
>>   b) assign a random partition from the list of partitions associated with
>> each bucket
>>
>> Just wanted to know if someone else had come across this issue before and
>> there was a better way of doing this.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Equally-weighted-partitions-in-Spark-tp5171p5212.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>
>


Re: Equally weighted partitions in Spark

2014-05-02 Thread Andrew Ash
Deenar,

I haven't heard of any activity to do partitioning in that way, but it does
seem more broadly valuable.


On Fri, May 2, 2014 at 10:15 AM, deenar.toraskar wrote:

> I have equal sized partitions now, but I want the RDD to be partitioned
> such
> that the partitions are equally weighted by some attribute of each RDD
> element (e.g. size or complexity).
>
> I have been looking at the RangePartitioner code and I have come up with
> something like
>
> EquallyWeightedPartitioner(noOfPartitions, weightFunction)
>
> 1) take a sum or (sample) of complexities of all elements and calculate
> average weight per partition
> 2) take a histogram of weights
> 3) assign a list of partitions to each bucket
> 4)  getPartition(key: Any): Int would
>   a) get the weight and then find the bucket
>   b) assign a random partition from the list of partitions associated with
> each bucket
>
> Just wanted to know if someone else had come across this issue before and
> there was a better way of doing this.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Equally-weighted-partitions-in-Spark-tp5171p5212.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: Equally weighted partitions in Spark

2014-05-02 Thread deenar.toraskar
I have equal sized partitions now, but I want the RDD to be partitioned such
that the partitions are equally weighted by some attribute of each RDD
element (e.g. size or complexity).

I have been looking at the RangePartitioner code and I have come up with
something like

EquallyWeightedPartitioner(noOfPartitions, weightFunction)

1) take a sum or (sample) of complexities of all elements and calculate
average weight per partition
2) take a histogram of weights
3) assign a list of partitions to each bucket
4)  getPartition(key: Any): Int would
  a) get the weight and then find the bucket
  b) assign a random partition from the list of partitions associated with
each bucket

Just wanted to know if someone else had come across this issue before and
there was a better way of doing this.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Equally-weighted-partitions-in-Spark-tp5171p5212.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Equally weighted partitions in Spark

2014-05-02 Thread Syed A. Hashmi
You can override the default partitioner with range
partitionerwhich
distributes data in roughly equal sized partitions.


On Thu, May 1, 2014 at 11:14 PM, deenar.toraskar wrote:

> Yes
>
> On a job I am currently running, 99% of the partitions finish within
> seconds
> and a couple of partitions take around and hour to finish. I am pricing
> some
> instruments and complex instruments take far longer to price than plain
> vanilla ones. If I could distribute these complex instruments evenly, the
> overall job times would greatly reduce.
>
>
> Deenar
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Equally-weighted-partitions-in-Spark-tp5171p5208.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: Equally weighted partitions in Spark

2014-05-01 Thread deenar.toraskar
Yes

On a job I am currently running, 99% of the partitions finish within seconds
and a couple of partitions take around and hour to finish. I am pricing some
instruments and complex instruments take far longer to price than plain
vanilla ones. If I could distribute these complex instruments evenly, the
overall job times would greatly reduce.


Deenar



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Equally-weighted-partitions-in-Spark-tp5171p5208.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Equally weighted partitions in Spark

2014-05-01 Thread Andrew Ash
The problem is that equally-sized partitions take variable time to complete
based on their contents?

Sent from my mobile phone
On May 1, 2014 8:31 AM, "deenar.toraskar"  wrote:

> Hi
>
> I am using Spark to distribute computationally intensive tasks across the
> cluster. Currently I partition my RDD of tasks randomly. There is a large
> variation in how long each of the jobs take to complete, leading to most
> partitions being processed quickly and a couple of partitions take forever
> to complete. I can mitigate this problem by increasing the number of
> partitions to some extent.
>
> Ideally i would like to partition tasks by complexity (Let's assume I can
> get such a value from the task object) such that each sum of complexity in
> of elements in each partition evenly distributed. Has anyone created such a
> partitioner before?
>
>
> Regards
> Deenar
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Equally-weighted-partitions-in-Spark-tp5171.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>