Repository: spark
Updated Branches:
  refs/heads/master 18350a570 -> dba1a62cf


[SPARK-7316][MLLIB] RDD sliding window with step

Implementation of step capability for sliding window function in MLlib's RDD.

Though one can use current sliding window with step 1 and then filter every Nth 
window, it will take more time and space (N*data.count times more than needed). 
For example, below are the results for various windows and steps on 10M data 
points:

Window | Step | Time | Windows produced
------------ | ------------- | ---------- | ----------
128 | 1 |  6.38 | 9999873
128 | 10 | 0.9 | 999988
128 | 100 | 0.41 | 99999
1024 | 1 | 44.67 | 9998977
1024 | 10 | 4.74 | 999898
1024 | 100 | 0.78 | 99990
```
import org.apache.spark.mllib.rdd.RDDFunctions._
val rdd = sc.parallelize(1 to 10000000, 10)
rdd.count
val window = 1024
val step = 1
val t = System.nanoTime(); val windows = rdd.sliding(window, step); 
println(windows.count); println((System.nanoTime() - t) / 1e9)
```

Author: unknown <ula...@ulanov3.americas.hpqcorp.net>
Author: Alexander Ulanov <na...@yandex.ru>
Author: Xiangrui Meng <m...@databricks.com>

Closes #5855 from avulanov/SPARK-7316-sliding.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dba1a62c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dba1a62c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dba1a62c

Branch: refs/heads/master
Commit: dba1a62cf1baa9ae1ee665d592e01dfad78331a2
Parents: 18350a5
Author: unknown <ula...@ulanov3.americas.hpqcorp.net>
Authored: Tue Nov 10 14:25:06 2015 -0800
Committer: Xiangrui Meng <m...@databricks.com>
Committed: Tue Nov 10 14:25:06 2015 -0800

----------------------------------------------------------------------
 .../apache/spark/mllib/rdd/RDDFunctions.scala   | 11 ++-
 .../org/apache/spark/mllib/rdd/SlidingRDD.scala | 71 +++++++++++---------
 .../spark/mllib/rdd/RDDFunctionsSuite.scala     | 11 +--
 3 files changed, 54 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/dba1a62c/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala
index 7817284..19a047d 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala
@@ -37,16 +37,21 @@ class RDDFunctions[T: ClassTag](self: RDD[T]) extends 
Serializable {
    * trigger a Spark job if the parent RDD has more than one partitions and 
the window size is
    * greater than 1.
    */
-  def sliding(windowSize: Int): RDD[Array[T]] = {
+  def sliding(windowSize: Int, step: Int): RDD[Array[T]] = {
     require(windowSize > 0, s"Sliding window size must be positive, but got 
$windowSize.")
-    if (windowSize == 1) {
+    if (windowSize == 1 && step == 1) {
       self.map(Array(_))
     } else {
-      new SlidingRDD[T](self, windowSize)
+      new SlidingRDD[T](self, windowSize, step)
     }
   }
 
   /**
+   * [[sliding(Int, Int)*]] with step = 1.
+   */
+  def sliding(windowSize: Int): RDD[Array[T]] = sliding(windowSize, 1)
+
+  /**
    * Reduces the elements of this RDD in a multi-level tree pattern.
    *
    * @param depth suggested depth of the tree (default: 2)

http://git-wip-us.apache.org/repos/asf/spark/blob/dba1a62c/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala
index 1facf83..ead8db6 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala
@@ -24,13 +24,13 @@ import org.apache.spark.{TaskContext, Partition}
 import org.apache.spark.rdd.RDD
 
 private[mllib]
-class SlidingRDDPartition[T](val idx: Int, val prev: Partition, val tail: 
Seq[T])
+class SlidingRDDPartition[T](val idx: Int, val prev: Partition, val tail: 
Seq[T], val offset: Int)
   extends Partition with Serializable {
   override val index: Int = idx
 }
 
 /**
- * Represents a RDD from grouping items of its parent RDD in fixed size blocks 
by passing a sliding
+ * Represents an RDD from grouping items of its parent RDD in fixed size 
blocks by passing a sliding
  * window over them. The ordering is first based on the partition index and 
then the ordering of
  * items within each partition. This is similar to sliding in Scala 
collections, except that it
  * becomes an empty RDD if the window size is greater than the total number of 
items. It needs to
@@ -40,19 +40,24 @@ class SlidingRDDPartition[T](val idx: Int, val prev: 
Partition, val tail: Seq[T]
  *
  * @param parent the parent RDD
  * @param windowSize the window size, must be greater than 1
+ * @param step step size for windows
  *
- * @see [[org.apache.spark.mllib.rdd.RDDFunctions#sliding]]
+ * @see [[org.apache.spark.mllib.rdd.RDDFunctions.sliding(Int, Int)*]]
+ * @see [[scala.collection.IterableLike.sliding(Int, Int)*]]
  */
 private[mllib]
-class SlidingRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: 
Int)
+class SlidingRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: 
Int, val step: Int)
   extends RDD[Array[T]](parent) {
 
-  require(windowSize > 1, s"Window size must be greater than 1, but got 
$windowSize.")
+  require(windowSize > 0 && step > 0 && !(windowSize == 1 && step == 1),
+    "Window size and step must be greater than 0, " +
+      s"and they cannot be both 1, but got windowSize = $windowSize and step = 
$step.")
 
   override def compute(split: Partition, context: TaskContext): 
Iterator[Array[T]] = {
     val part = split.asInstanceOf[SlidingRDDPartition[T]]
     (firstParent[T].iterator(part.prev, context) ++ part.tail)
-      .sliding(windowSize)
+      .drop(part.offset)
+      .sliding(windowSize, step)
       .withPartial(false)
       .map(_.toArray)
   }
@@ -62,40 +67,42 @@ class SlidingRDD[T: ClassTag](@transient val parent: 
RDD[T], val windowSize: Int
 
   override def getPartitions: Array[Partition] = {
     val parentPartitions = parent.partitions
-    val n = parentPartitions.size
+    val n = parentPartitions.length
     if (n == 0) {
       Array.empty
     } else if (n == 1) {
-      Array(new SlidingRDDPartition[T](0, parentPartitions(0), Seq.empty))
+      Array(new SlidingRDDPartition[T](0, parentPartitions(0), Seq.empty, 0))
     } else {
-      val n1 = n - 1
       val w1 = windowSize - 1
-      // Get the first w1 items of each partition, starting from the second 
partition.
-      val nextHeads =
-        parent.context.runJob(parent, (iter: Iterator[T]) => 
iter.take(w1).toArray, 1 until n)
-      val partitions = mutable.ArrayBuffer[SlidingRDDPartition[T]]()
+      // Get partition sizes and first w1 elements.
+      val (sizes, heads) = parent.mapPartitions { iter =>
+        val w1Array = iter.take(w1).toArray
+        Iterator.single((w1Array.length + iter.length, w1Array))
+      }.collect().unzip
+      val partitions = mutable.ArrayBuffer.empty[SlidingRDDPartition[T]]
       var i = 0
+      var cumSize = 0
       var partitionIndex = 0
-      while (i < n1) {
-        var j = i
-        val tail = mutable.ListBuffer[T]()
-        // Keep appending to the current tail until appended a head of size w1.
-        while (j < n1 && nextHeads(j).size < w1) {
-          tail ++= nextHeads(j)
-          j += 1
+      while (i < n) {
+        val mod = cumSize % step
+        val offset = if (mod == 0) 0 else step - mod
+        val size = sizes(i)
+        if (offset < size) {
+          val tail = mutable.ListBuffer.empty[T]
+          // Keep appending to the current tail until it has w1 elements.
+          var j = i + 1
+          while (j < n && tail.length < w1) {
+            tail ++= heads(j).take(w1 - tail.length)
+            j += 1
+          }
+          if (sizes(i) + tail.length >= offset + windowSize) {
+            partitions +=
+              new SlidingRDDPartition[T](partitionIndex, parentPartitions(i), 
tail, offset)
+            partitionIndex += 1
+          }
         }
-        if (j < n1) {
-          tail ++= nextHeads(j)
-          j += 1
-        }
-        partitions += new SlidingRDDPartition[T](partitionIndex, 
parentPartitions(i), tail)
-        partitionIndex += 1
-        // Skip appended heads.
-        i = j
-      }
-      // If the head of last partition has size w1, we also need to add this 
partition.
-      if (nextHeads.last.size == w1) {
-        partitions += new SlidingRDDPartition[T](partitionIndex, 
parentPartitions(n1), Seq.empty)
+        cumSize += size
+        i += 1
       }
       partitions.toArray
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/dba1a62c/mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala 
b/mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala
index bc64172..ac93733 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala
@@ -28,9 +28,12 @@ class RDDFunctionsSuite extends SparkFunSuite with 
MLlibTestSparkContext {
     for (numPartitions <- 1 to 8) {
       val rdd = sc.parallelize(data, numPartitions)
       for (windowSize <- 1 to 6) {
-        val sliding = rdd.sliding(windowSize).collect().map(_.toList).toList
-        val expected = data.sliding(windowSize).map(_.toList).toList
-        assert(sliding === expected)
+        for (step <- 1 to 3) {
+          val sliding = rdd.sliding(windowSize, 
step).collect().map(_.toList).toList
+          val expected = data.sliding(windowSize, step)
+            .map(_.toList).toList.filter(l => l.size == windowSize)
+          assert(sliding === expected)
+        }
       }
       assert(rdd.sliding(7).collect().isEmpty,
         "Should return an empty RDD if the window size is greater than the 
number of items.")
@@ -40,7 +43,7 @@ class RDDFunctionsSuite extends SparkFunSuite with 
MLlibTestSparkContext {
   test("sliding with empty partitions") {
     val data = Seq(Seq(1, 2, 3), Seq.empty[Int], Seq(4), Seq.empty[Int], 
Seq(5, 6, 7))
     val rdd = sc.parallelize(data, data.length).flatMap(s => s)
-    assert(rdd.partitions.size === data.length)
+    assert(rdd.partitions.length === data.length)
     val sliding = rdd.sliding(3).collect().toSeq.map(_.toSeq)
     val expected = data.flatMap(x => x).sliding(3).toSeq.map(_.toSeq)
     assert(sliding === expected)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to