This is my first implementation. There are a few rough edges, but when I run
this I get the following exception. The class extends Partitioner which in
turn extends Serializable. Any idea what I am doing wrong?

scala> res156.partitionBy(new EqualWeightPartitioner(1000, res156,
weightFunction))
14/05/09 16:59:36 INFO SparkContext: Starting job: histogram at
<console>:197
14/05/09 16:59:36 INFO DAGScheduler: Got job 18 (histogram at <console>:197)
with 250 output partitions (allowLocal=false)
14/05/09 16:59:36 INFO DAGScheduler: Final stage: Stage 18 (histogram at
<console>:197)
14/05/09 16:59:36 INFO DAGScheduler: Parents of final stage: List()
14/05/09 16:59:36 INFO DAGScheduler: Missing parents: List()
14/05/09 16:59:36 INFO DAGScheduler: Submitting Stage 18
(MapPartitionsRDD[36] at histogram at <console>:197), which has no missing
parents
14/05/09 16:59:36 INFO DAGScheduler: Failed to run histogram at
<console>:197
org.apache.spark.SparkException: Job aborted: Task not serializable:
java.io.NotSerializableException: scala.util.Random
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
        at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
        at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
        at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)
        at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
        at akka.dispatch.Mailbox.run(Mailbox.scala:219)
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
==================================


import scala.reflect.ClassTag

import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils
import org.apache.spark.Partitioner 
import java.util
import scala.Array
import scala.reflect._
import scala.util.Random

/**
 * A [[org.apache.spark.Partitioner]] that partitions sortable records by
range into roughly
 * equal ranges. The ranges are determined by sampling the content of the
RDD passed in.
 */
class EqualWeightPartitioner[K : Ordering : ClassTag, V](
    partitions: Int,
    @transient rdd: RDD[_ <: Product2[K,V]],
    weightFunction:  K => Double)  extends Partitioner {

  private val ordering = implicitly[Ordering[K]]
  private val histogram = rdd.map(x=> (x._1, weightFunction(x._1))).map(x=>
x._2.toDouble).histogram(partitions)
  private val bucketSize = (histogram._1(1) - histogram._1(0))

  // need to refine this algorithm to use single partitions

  // An array of upper bounds for the first (partitions - 1) partitions
  private val bucketToPartitionMap: Array[(Int, Int)] = {
    if (partitions == 1) {
      Array()
    } else {
      val bucketWeights = histogram._2.zipWithIndex.map{case(y,idx)=>
(y*(histogram._1(idx) + histogram._1(idx+1))/2)}.map{var s :Double= 0.0; d
=> {s += d; s}}
      val averageWeight = bucketWeights.last/partitions
      val bucketPartition : Array[(Int, Int)] =
bucketWeights.map(x=>Math.max(0,Math.round(x/averageWeight)-1).toInt).zipWithIndex
      bucketPartition.map(x=>x._2 match { case 0 => (0, x._1) case _ =>
(Math.min(partitions-1,bucketPartition(x._2-1)._1+1), x._1)})
    }
  }

  def numPartitions = partitions

  def getPartition(key: Any): Int = {
    val rnd = new Random
    val k = key.asInstanceOf[K]
    val weight : Double = weightFunction(k)
    val bucketIndex : Int = Math.min(1,(weight/bucketSize).toInt)
    val partitionRange : (Int, Int) = bucketToPartitionMap(bucketIndex - 1)
    partitionRange._1 + rnd.nextInt((partitionRange._2 - partitionRange._1 +
1 ))
  }

  override def equals(other: Any): Boolean = other match {
    case r: EqualWeightPartitioner[_,_] =>
      r.bucketToPartitionMap.sameElements(bucketToPartitionMap)
    case _ =>
      false
  }

}



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Equally-weighted-partitions-in-Spark-tp5171p5525.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply via email to