Re: RDD from partitions

2015-08-28 Thread Rishitesh Mishra
Hi Jem,
A simple way to get this is to use MapPartitionedRDD. Please see the below
code. For this you need to know your parent RDD's partition numbers that
you want to exclude.  One drawback here is the new RDD will also invoke
similar number of tasks as parent RDDs as both the RDDs have same number of
partitions. We only be excluding the results from certain partitions. If
you can live with that , then its OK.

val ones = sc.makeRDD(1 to 100, 10).map(x = x) // base RDD

// Reduced RDD
val reduced = ones.mapPartitions { iter = {

  new Iterator[Int](){
override def hasNext: Boolean = {
  if(Seq(0,1,2).contains(TaskContext.get().partitionId)) {
false
} else{
iter.hasNext
  }
}

override def next():Int = iter.next()
  }

}
}.collect().foreach(println)




On Fri, Aug 28, 2015 at 12:33 PM, Jem Tucker jem.tuc...@gmail.com wrote:

 Hi,

 I am trying to create an RDD from a selected number of its parents
 partitions. My current approach is to create my own SelectedPartitionRDD
 and implement compute and numPartitions myself, problem is the compute
 method is marked as @developerApi, and hence unsuitable for me to be using
 in my application. Are there any alternative methods that will only use the
 stable parts of the spark API?

 Thanks,

 Jem



Re: RDD from partitions

2015-08-28 Thread Jem Tucker
Hey Rishitesh,

Thats perfect thanks so much! Dont know why i didnt think of using
mapPartitions like this

Thanks,

Jem

On Fri, Aug 28, 2015 at 10:35 AM Rishitesh Mishra rishi80.mis...@gmail.com
wrote:

 Hi Jem,
 A simple way to get this is to use MapPartitionedRDD. Please see the below
 code. For this you need to know your parent RDD's partition numbers that
 you want to exclude.  One drawback here is the new RDD will also invoke
 similar number of tasks as parent RDDs as both the RDDs have same number of
 partitions. We only be excluding the results from certain partitions. If
 you can live with that , then its OK.

 val ones = sc.makeRDD(1 to 100, 10).map(x = x) // base RDD

 // Reduced RDD
 val reduced = ones.mapPartitions { iter = {

   new Iterator[Int](){
 override def hasNext: Boolean = {
   if(Seq(0,1,2).contains(TaskContext.get().partitionId)) {
 false
 } else{
 iter.hasNext
   }
 }

 override def next():Int = iter.next()
   }

 }
 }.collect().foreach(println)




 On Fri, Aug 28, 2015 at 12:33 PM, Jem Tucker jem.tuc...@gmail.com wrote:

 Hi,

 I am trying to create an RDD from a selected number of its parents
 partitions. My current approach is to create my own SelectedPartitionRDD
 and implement compute and numPartitions myself, problem is the compute
 method is marked as @developerApi, and hence unsuitable for me to be using
 in my application. Are there any alternative methods that will only use the
 stable parts of the spark API?

 Thanks,

 Jem