Updated Branches: refs/heads/master 7bafb68d7 -> 588a1695f
Added partition aware union to improve reduceByKeyAndWindow Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/2ec4b2e3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/2ec4b2e3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/2ec4b2e3 Branch: refs/heads/master Commit: 2ec4b2e38d432ef4f21b725c2fceac863d5f9ea1 Parents: e2ebc3a Author: Tathagata Das <tathagata.das1...@gmail.com> Authored: Wed Nov 20 23:49:30 2013 -0800 Committer: Tathagata Das <tathagata.das1...@gmail.com> Committed: Wed Nov 20 23:49:30 2013 -0800 ---------------------------------------------------------------------- .../streaming/dstream/WindowedDStream.scala | 51 +++++++++++++++++++- 1 file changed, 49 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2ec4b2e3/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala index 3c57294..03f522e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala @@ -20,7 +20,12 @@ package org.apache.spark.streaming.dstream import org.apache.spark.rdd.RDD import org.apache.spark.rdd.UnionRDD import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{Duration, Interval, Time, DStream} +import org.apache.spark.streaming._ +import org.apache.spark._ +import scala.Some +import scala.Some +import scala.Some +import org.apache.spark.streaming.Duration private[streaming] class WindowedDStream[T: ClassManifest]( @@ -49,9 +54,51 @@ class WindowedDStream[T: ClassManifest]( override def compute(validTime: Time): Option[RDD[T]] = { val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime) - Some(new UnionRDD(ssc.sc, parent.slice(currentWindow))) + val rddsInWindow = parent.slice(currentWindow) + val windowRDD = if (rddsInWindow.flatMap(_.partitioner).distinct.length == 1) { + logInfo("Using partition aware union") + new PartitionAwareUnionRDD(ssc.sc, rddsInWindow) + } else { + logInfo("Using normal union") + new UnionRDD(ssc.sc,rddsInWindow) + } + Some(windowRDD) } } +private[streaming] +class PartitionAwareUnionRDDPartition(val idx: Int, val partitions: Array[Partition]) + extends Partition { + override val index = idx + override def hashCode(): Int = idx +} + +private[streaming] +class PartitionAwareUnionRDD[T: ClassManifest]( + sc: SparkContext, + var rdds: Seq[RDD[T]]) + extends RDD[T](sc, rdds.map(x => new OneToOneDependency(x))) { + require(rdds.length > 0) + require(rdds.flatMap(_.partitioner).distinct.length == 1, "Parent RDDs have different partitioners") + + override val partitioner = rdds.head.partitioner + + override def getPartitions: Array[Partition] = { + val numPartitions = rdds.head.partitions.length + (0 until numPartitions).map(index => { + val parentPartitions = rdds.map(_.partitions(index)).toArray + new PartitionAwareUnionRDDPartition(index, parentPartitions) + }).toArray + } + + override def compute(s: Partition, context: TaskContext): Iterator[T] = { + val parentPartitions = s.asInstanceOf[PartitionAwareUnionRDDPartition].partitions + rdds.zip(parentPartitions).iterator.flatMap { + case (rdd, p) => rdd.iterator(p, context) + } + } +} + +