Modifications as suggested in PR feedback- - more variants of mapPartitions added to JavaRDDLike - move setGenerator to JavaRDDLike - clean up
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/1442cd5d Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/1442cd5d Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/1442cd5d Branch: refs/heads/master Commit: 1442cd5d5099de71747b1cccf463b94fdedcda1f Parents: e922973 Author: Saurabh Rawat <sr.ekla...@gmail.com> Authored: Tue Jan 14 14:19:02 2014 +0530 Committer: Saurabh Rawat <sr.ekla...@gmail.com> Committed: Tue Jan 14 14:19:02 2014 +0530 ---------------------------------------------------------------------- .../org/apache/spark/api/java/JavaRDD.scala | 9 +------- .../org/apache/spark/api/java/JavaRDDLike.scala | 22 ++++++++++++++++++++ 2 files changed, 23 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1442cd5d/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala index e687bbd..7d48ce0 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala @@ -21,10 +21,8 @@ import scala.reflect.ClassTag import org.apache.spark._ import org.apache.spark.rdd.RDD -import org.apache.spark.api.java.function.{Function => JFunction, FlatMapFunction => JFMap, VoidFunction} +import org.apache.spark.api.java.function.{Function => JFunction} import org.apache.spark.storage.StorageLevel -import java.util.{Iterator => JIterator} -import scala.collection.JavaConversions._ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T]) extends JavaRDDLike[T, JavaRDD[T]] { @@ -135,11 +133,6 @@ JavaRDDLike[T, JavaRDD[T]] { rdd.setName(name) this } - - /** Reset generator*/ - def setGenerator(_generator: String) = { - rdd.setGenerator(_generator) - } } object JavaRDD { http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1442cd5d/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index eb8e34e..808c907 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -157,6 +157,23 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { } /** + * Return a new RDD by applying a function to each partition of this RDD. + */ + def mapPartitions(f: DoubleFlatMapFunction[java.util.Iterator[T]], preservesPartitioning: Boolean): JavaDoubleRDD = { + def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator()) + new JavaDoubleRDD(rdd.mapPartitions(fn, preservesPartitioning).map((x: java.lang.Double) => x.doubleValue())) + } + + /** + * Return a new RDD by applying a function to each partition of this RDD. + */ + def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2], preservesPartitioning: Boolean): + JavaPairRDD[K2, V2] = { + def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator()) + JavaPairRDD.fromRDD(rdd.mapPartitions(fn, preservesPartitioning))(f.keyType(), f.valueType()) + } + + /** * Applies a function f to each partition of this RDD. */ def foreachPartition(f: VoidFunction[java.util.Iterator[T]]) { @@ -476,4 +493,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def countApproxDistinct(relativeSD: Double = 0.05): Long = rdd.countApproxDistinct(relativeSD) def name(): String = rdd.name + + /** Reset generator */ + def setGenerator(_generator: String) = { + rdd.setGenerator(_generator) + } }