Re: Splitting RDD by partition
I think the latter approach is better, which can avoid un-necessary computations by filtering out un-needed partitions. It is better to cache the previous RDD so that it won’t be computed twice > On May 20, 2016, at 16:59, shlomi <shlomivak...@gmail.com> wrote: > > Another approach I found: > > First, I make a PartitionsRDD class which only takes a certain range of > partitions > - > case class PartitionsRDDPartition(val index:Int, val origSplit:Partition) > extends Partition {} > > class PartitionsRDD[U: ClassTag](var prev: RDD[U], drop:Int,take:Int) > extends RDD[U](prev) { > override def getPartitions: Array[Partition] = > prev.partitions.drop(drop).take(take).zipWithIndex.map{case (split, > idx)=>{new PartitionsRDDPartition(idx, > split)}}.asInstanceOf[Array[Partition]] > override def compute(split: Partition, context: TaskContext): Iterator[U] > = > prev.iterator(partitions(split.index).asInstanceOf[PartitionsRDDPartition].origSplit, > context) > } > - > > And then I can create my two RDD's using the following: > - > def splitByPartition[T:ClassTag](rdd: RDD[T], hotPartitions:Int): (RDD[T], > RDD[T]) = { > val left = new PartitionsRDD[T](rdd, 0, hotPartitions); > val right = new PartitionsRDD[T](rdd, hotPartitions, > rdd.numPartitions-hotPartitions); > (left, right) > } > - > > This approach saves a few minutes when compared to the one in the previous > post (at least on a local test.. I still need to test this on a real > cluster). > > Any thought about this? Is this the right thing to do or am I missing > something important? > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Splitting-RDD-by-partition-tp26983p26985.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Splitting RDD by partition
Another approach I found: First, I make a PartitionsRDD class which only takes a certain range of partitions - case class PartitionsRDDPartition(val index:Int, val origSplit:Partition) extends Partition {} class PartitionsRDD[U: ClassTag](var prev: RDD[U], drop:Int,take:Int) extends RDD[U](prev) { override def getPartitions: Array[Partition] = prev.partitions.drop(drop).take(take).zipWithIndex.map{case (split, idx)=>{new PartitionsRDDPartition(idx, split)}}.asInstanceOf[Array[Partition]] override def compute(split: Partition, context: TaskContext): Iterator[U] = prev.iterator(partitions(split.index).asInstanceOf[PartitionsRDDPartition].origSplit, context) } - And then I can create my two RDD's using the following: - def splitByPartition[T:ClassTag](rdd: RDD[T], hotPartitions:Int): (RDD[T], RDD[T]) = { val left = new PartitionsRDD[T](rdd, 0, hotPartitions); val right = new PartitionsRDD[T](rdd, hotPartitions, rdd.numPartitions-hotPartitions); (left, right) } - This approach saves a few minutes when compared to the one in the previous post (at least on a local test.. I still need to test this on a real cluster). Any thought about this? Is this the right thing to do or am I missing something important? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Splitting-RDD-by-partition-tp26983p26985.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Splitting RDD by partition
Hey Sparkers, I have a workflow where I have to ensure certain keys are always in the same RDD partition (its a mandatory algorithmic invariant). I can easily achieve this by having a custom partitioner. This results in a single RDD that requires further computations. However, currently there are two completely different computations for different partitions. Some partitions are done by this steps and could already be written to disk, while the rest still needs a few more map/filter/shuffle/etc steps to complete The simplest idea I have for this is to have some way to partition the RDD into multiple RDD's based on partition numbers (which I know from my custom partitioner). I have managed to achieve this like so (splitting to only 2 RDDs): https://gist.github.com/vadali/3e5f832e4a6cb320e50b67dd05b3e97c - // Split an rdd according to its partition number def splitByPartition[T:ClassTag](rdd: RDD[T], hotPartitions:Int): (RDD[T], RDD[T]) = { val splits = rdd.mapPartitions { iter => val partId = TaskContext.get.partitionId val left = if (partId < hotPartitions) iter else empty val right = if (partId >= hotPartitions) iter else empty Seq(left, right).iterator } val left = splits.mapPartitions { iter => iter.next().toIterator} val right = splits.mapPartitions { iter => iter.next() iter.next().toIterator } (left, right) } - Is this the best way? This seems to cause some shuffling, however I am not sure how they impact performance.. Is there another way, maybe even a more involved way, to achieve this? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Splitting-RDD-by-partition-tp26983.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org