This is my first implementation. There are a few rough edges, but when I run this I get the following exception. The class extends Partitioner which in turn extends Serializable. Any idea what I am doing wrong?
scala> res156.partitionBy(new EqualWeightPartitioner(1000, res156, weightFunction)) 14/05/09 16:59:36 INFO SparkContext: Starting job: histogram at <console>:197 14/05/09 16:59:36 INFO DAGScheduler: Got job 18 (histogram at <console>:197) with 250 output partitions (allowLocal=false) 14/05/09 16:59:36 INFO DAGScheduler: Final stage: Stage 18 (histogram at <console>:197) 14/05/09 16:59:36 INFO DAGScheduler: Parents of final stage: List() 14/05/09 16:59:36 INFO DAGScheduler: Missing parents: List() 14/05/09 16:59:36 INFO DAGScheduler: Submitting Stage 18 (MapPartitionsRDD[36] at histogram at <console>:197), which has no missing parents 14/05/09 16:59:36 INFO DAGScheduler: Failed to run histogram at <console>:197 org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: scala.util.Random at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ================================== import scala.reflect.ClassTag import org.apache.spark.rdd.RDD import org.apache.spark.util.Utils import org.apache.spark.Partitioner import java.util import scala.Array import scala.reflect._ import scala.util.Random /** * A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly * equal ranges. The ranges are determined by sampling the content of the RDD passed in. */ class EqualWeightPartitioner[K : Ordering : ClassTag, V]( partitions: Int, @transient rdd: RDD[_ <: Product2[K,V]], weightFunction: K => Double) extends Partitioner { private val ordering = implicitly[Ordering[K]] private val histogram = rdd.map(x=> (x._1, weightFunction(x._1))).map(x=> x._2.toDouble).histogram(partitions) private val bucketSize = (histogram._1(1) - histogram._1(0)) // need to refine this algorithm to use single partitions // An array of upper bounds for the first (partitions - 1) partitions private val bucketToPartitionMap: Array[(Int, Int)] = { if (partitions == 1) { Array() } else { val bucketWeights = histogram._2.zipWithIndex.map{case(y,idx)=> (y*(histogram._1(idx) + histogram._1(idx+1))/2)}.map{var s :Double= 0.0; d => {s += d; s}} val averageWeight = bucketWeights.last/partitions val bucketPartition : Array[(Int, Int)] = bucketWeights.map(x=>Math.max(0,Math.round(x/averageWeight)-1).toInt).zipWithIndex bucketPartition.map(x=>x._2 match { case 0 => (0, x._1) case _ => (Math.min(partitions-1,bucketPartition(x._2-1)._1+1), x._1)}) } } def numPartitions = partitions def getPartition(key: Any): Int = { val rnd = new Random val k = key.asInstanceOf[K] val weight : Double = weightFunction(k) val bucketIndex : Int = Math.min(1,(weight/bucketSize).toInt) val partitionRange : (Int, Int) = bucketToPartitionMap(bucketIndex - 1) partitionRange._1 + rnd.nextInt((partitionRange._2 - partitionRange._1 + 1 )) } override def equals(other: Any): Boolean = other match { case r: EqualWeightPartitioner[_,_] => r.bucketToPartitionMap.sameElements(bucketToPartitionMap) case _ => false } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Equally-weighted-partitions-in-Spark-tp5171p5525.html Sent from the Apache Spark User List mailing list archive at Nabble.com.