Hey Sparkers, I have a workflow where I have to ensure certain keys are always in the same RDD partition (its a mandatory algorithmic invariant). I can easily achieve this by having a custom partitioner.
This results in a single RDD that requires further computations. However, currently there are two completely different computations for different partitions. Some partitions are done by this steps and could already be written to disk, while the rest still needs a few more map/filter/shuffle/etc steps to complete The simplest idea I have for this is to have some way to partition the RDD into multiple RDD's based on partition numbers (which I know from my custom partitioner). I have managed to achieve this like so (splitting to only 2 RDDs): https://gist.github.com/vadali/3e5f832e4a6cb320e50b67dd05b3e97c ----------------------------------------------------- // Split an rdd according to its partition number def splitByPartition[T:ClassTag](rdd: RDD[T], hotPartitions:Int): (RDD[T], RDD[T]) = { val splits = rdd.mapPartitions { iter => val partId = TaskContext.get.partitionId val left = if (partId < hotPartitions) iter else empty val right = if (partId >= hotPartitions) iter else empty Seq(left, right).iterator } val left = splits.mapPartitions { iter => iter.next().toIterator} val right = splits.mapPartitions { iter => iter.next() iter.next().toIterator } (left, right) } ----------------------------------------------------- Is this the best way? This seems to cause some shuffling, however I am not sure how they impact performance.. Is there another way, maybe even a more involved way, to achieve this? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Splitting-RDD-by-partition-tp26983.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org