Re: Splitting RDD by partition

2016-05-20 Thread Sun Rui
I think the latter approach is better, which can avoid un-necessary 
computations by filtering out un-needed partitions.
It is better to cache the previous RDD so that it won’t be computed twice
> On May 20, 2016, at 16:59, shlomi <shlomivak...@gmail.com> wrote:
> 
> Another approach I found:
> 
> First, I make a PartitionsRDD class which only takes a certain range of
> partitions
> - 
> case class PartitionsRDDPartition(val index:Int, val origSplit:Partition)
> extends Partition {}
> 
> class PartitionsRDD[U: ClassTag](var prev: RDD[U], drop:Int,take:Int)
> extends RDD[U](prev) {
>  override def getPartitions: Array[Partition] =
> prev.partitions.drop(drop).take(take).zipWithIndex.map{case (split,
> idx)=>{new PartitionsRDDPartition(idx,
> split)}}.asInstanceOf[Array[Partition]]
>  override def compute(split: Partition, context: TaskContext): Iterator[U]
> =
> prev.iterator(partitions(split.index).asInstanceOf[PartitionsRDDPartition].origSplit,
> context)
> }
> - 
> 
> And then I can create my two RDD's using the following:
> - 
> def splitByPartition[T:ClassTag](rdd: RDD[T], hotPartitions:Int): (RDD[T],
> RDD[T]) = {
>   val left  = new PartitionsRDD[T](rdd, 0, hotPartitions);
>   val right = new PartitionsRDD[T](rdd, hotPartitions,
> rdd.numPartitions-hotPartitions);
>   (left, right)
> }
> - 
> 
> This approach saves a few minutes when compared to the one in the previous
> post (at least on a local test.. I still need to test this on a real
> cluster).
> 
> Any thought about this? Is this the right thing to do or am I missing
> something important?
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Splitting-RDD-by-partition-tp26983p26985.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Splitting RDD by partition

2016-05-20 Thread shlomi
Another approach I found:

First, I make a PartitionsRDD class which only takes a certain range of
partitions
- 
case class PartitionsRDDPartition(val index:Int, val origSplit:Partition)
extends Partition {}

class PartitionsRDD[U: ClassTag](var prev: RDD[U], drop:Int,take:Int)
extends RDD[U](prev) {
  override def getPartitions: Array[Partition] =
prev.partitions.drop(drop).take(take).zipWithIndex.map{case (split,
idx)=>{new PartitionsRDDPartition(idx,
split)}}.asInstanceOf[Array[Partition]]
  override def compute(split: Partition, context: TaskContext): Iterator[U]
=
prev.iterator(partitions(split.index).asInstanceOf[PartitionsRDDPartition].origSplit,
context)
}
- 

And then I can create my two RDD's using the following:
- 
def splitByPartition[T:ClassTag](rdd: RDD[T], hotPartitions:Int): (RDD[T],
RDD[T]) = {
   val left  = new PartitionsRDD[T](rdd, 0, hotPartitions);
   val right = new PartitionsRDD[T](rdd, hotPartitions,
rdd.numPartitions-hotPartitions);
   (left, right)
}
- 

This approach saves a few minutes when compared to the one in the previous
post (at least on a local test.. I still need to test this on a real
cluster).

Any thought about this? Is this the right thing to do or am I missing
something important?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Splitting-RDD-by-partition-tp26983p26985.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Splitting RDD by partition

2016-05-19 Thread shlomi
Hey Sparkers,

I have a workflow where I have to ensure certain keys are always in the same
RDD partition (its a mandatory algorithmic invariant). I can easily achieve
this by having a custom partitioner. 

This results in a single RDD that requires further computations. However,
currently there are two completely different computations for different
partitions. Some partitions are done by this steps and could already be
written to disk, while the rest still needs a few more
map/filter/shuffle/etc steps to complete 

The simplest idea I have for this is to have some way to partition the RDD
into multiple RDD's based on partition numbers (which I know from my custom
partitioner).

I have managed to achieve this like so (splitting to only 2 RDDs):
https://gist.github.com/vadali/3e5f832e4a6cb320e50b67dd05b3e97c
-
// Split an rdd according to its partition number
def splitByPartition[T:ClassTag](rdd: RDD[T], hotPartitions:Int): (RDD[T],
RDD[T]) = {
  val splits = rdd.mapPartitions { iter =>
val partId = TaskContext.get.partitionId
val left = if (partId <  hotPartitions) iter else empty
val right   = if (partId >= hotPartitions) iter else empty
Seq(left, right).iterator
  }

  val left = splits.mapPartitions { iter => iter.next().toIterator}
  val right = splits.mapPartitions { iter =>
iter.next()
iter.next().toIterator
  }
  (left, right)
}
-

Is this the best way? This seems to cause some shuffling, however I am not
sure how they impact performance..

Is there another way, maybe even a more involved way, to achieve this? 

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Splitting-RDD-by-partition-tp26983.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org