[ https://issues.apache.org/jira/browse/SPARK-3489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14136738#comment-14136738 ]
Mohit Jaggi commented on SPARK-3489: ------------------------------------ Proposed diff --- MohitMacBook:spark mohit$ git diff diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index a9b905b..2c9f034 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -711,6 +711,21 @@ abstract class RDD[T: ClassTag]( } } } + + /** + * Zips this RDD with a sequence of other RDDs, returning key-value pairs with the first element in each RDD, + * second element in each RDD, etc. Assumes that the two RDDs have the *same number of + * partitions* and the *same number of elements in each partition* (e.g. one was made through + * a map on the other). + */ + def zip(others: Seq[RDD[_]]): RDD[Array[Any]] = { + zipPartitions(others, preservesPartitioning = false) { iterSeq: Seq[Iterator[Any]] => + new Iterator[Array[Any]] { + def hasNext = !iterSeq.exists(! _.hasNext) + def next = iterSeq.map { iter => iter.next }.toArray + } + } + } /** * Zip this RDD's partitions with one (or more) RDD(s) and return a new RDD by @@ -748,7 +763,11 @@ abstract class RDD[T: ClassTag]( (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] = new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4, false) - + def zipPartitions[V: ClassTag] + (others: Seq[RDD[_]], preservesPartitioning: Boolean) + (f: (Seq[Iterator[Any]]) => Iterator[V]): RDD[V] = + new ZippedPartitionsRDDn(sc, sc.clean(f), this +: others, false) + // Actions (launch a job to return a value to the user program) /** diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala index f3d30f6..d22d7d3 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala @@ -146,3 +146,22 @@ private[spark] class ZippedPartitionsRDD4 rdd4 = null } } + +private[spark] class ZippedPartitionsRDDn + [V: ClassTag]( + sc: SparkContext, + f: (Seq[Iterator[_]] => Iterator[V]), + var rddSeq: Seq[RDD[_]], + preservesPartitioning: Boolean = false) + extends ZippedPartitionsBaseRDD[V](sc, rddSeq, preservesPartitioning) { + + override def compute(s: Partition, context: TaskContext): Iterator[V] = { + val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions + f(rdds.zipWithIndex.map (rdd => rdd._1.iterator(partitions(rdd._2), context))) + } + + override def clearDependencies() { + super.clearDependencies() + rdds = null + } +} (END) > support rdd.zip(rdd1, rdd2,...) with variable number of rdds as params > ---------------------------------------------------------------------- > > Key: SPARK-3489 > URL: https://issues.apache.org/jira/browse/SPARK-3489 > Project: Spark > Issue Type: Improvement > Components: Spark Core > Affects Versions: 1.0.2 > Reporter: Mohit Jaggi > Fix For: 1.0.3 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org