Updated Branches:
  refs/heads/master 68b2c0d02 -> e733d655d

Merge pull request #578 from mengxr/rank.

SPARK-1076: zipWithIndex and zipWithUniqueId to RDD

Assign ranks to an ordered or unordered data set is a common operation. This 
could be done by first counting records in each partition and then assign ranks 
in parallel.

The purpose of assigning ranks to an unordered set is usually to get a unique 
id for each item, e.g., to map feature names to feature indices. In such cases, 
the assignment could be done without counting records, saving one spark job.

https://spark-project.atlassian.net/browse/SPARK-1076

== update ==
Because assigning ranks is very similar to Scala's zipWithIndex, I changed the 
method name to zipWithIndex and put the index in the value field.

Author: Xiangrui Meng <m...@databricks.com>

Closes #578 and squashes the following commits:

52a05e1 [Xiangrui Meng] changed assignRanks to zipWithIndex changed 
assignUniqueIds to zipWithUniqueId minor updates
756881c [Xiangrui Meng] simplified RankedRDD by implementing assignUniqueIds 
separately moved couting iterator size to Utils do not count items in the last 
partition and skip counting if there is only one partition
630868c [Xiangrui Meng] newline
21b434b [Xiangrui Meng] add assignRanks and assignUniqueIds to RDD


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/e733d655
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/e733d655
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/e733d655

Branch: refs/heads/master
Commit: e733d655df6bf569d3d16fdd65c11ef3d2b9de16
Parents: 68b2c0d
Author: Xiangrui Meng <m...@databricks.com>
Authored: Wed Feb 12 00:42:42 2014 -0800
Committer: Reynold Xin <r...@apache.org>
Committed: Wed Feb 12 00:42:42 2014 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/rdd/RDD.scala   | 36 ++++++----
 .../apache/spark/rdd/ZippedWithIndexRDD.scala   | 69 ++++++++++++++++++++
 .../scala/org/apache/spark/util/Utils.scala     | 13 ++++
 .../scala/org/apache/spark/rdd/RDDSuite.scala   | 26 ++++++++
 .../org/apache/spark/util/UtilsSuite.scala      |  7 ++
 5 files changed, 139 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e733d655/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 8010bb6..ec8e311 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -775,18 +775,7 @@ abstract class RDD[T: ClassTag](
   /**
    * Return the number of elements in the RDD.
    */
-  def count(): Long = {
-    sc.runJob(this, (iter: Iterator[T]) => {
-      // Use a while loop to count the number of elements rather than 
iter.size because
-      // iter.size uses a for loop, which is slightly slower in current 
version of Scala.
-      var result = 0L
-      while (iter.hasNext) {
-        result += 1L
-        iter.next()
-      }
-      result
-    }).sum
-  }
+  def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
 
   /**
    * (Experimental) Approximate version of count() that returns a potentially 
incomplete result
@@ -870,6 +859,29 @@ abstract class RDD[T: ClassTag](
   }
 
   /**
+   * Zips this RDD with its element indices. The ordering is first based on 
the partition index
+   * and then the ordering of items within each partition. So the first item 
in the first
+   * partition gets index 0, and the last item in the last partition receives 
the largest index.
+   * This is similar to Scala's zipWithIndex but it uses Long instead of Int 
as the index type.
+   * This method needs to trigger a spark job when this RDD contains more than 
one partitions.
+   */
+  def zipWithIndex(): RDD[(T, Long)] = new ZippedWithIndexRDD(this)
+
+  /**
+   * Zips this RDD with generated unique Long ids. Items in the kth partition 
will get ids k, n+k,
+   * 2*n+k, ..., where n is the number of partitions. So there may exist gaps, 
but this method
+   * won't trigger a spark job, which is different from 
[[org.apache.spark.rdd.RDD#zipWithIndex]].
+   */
+  def zipWithUniqueId(): RDD[(T, Long)] = {
+    val n = this.partitions.size
+    this.mapPartitionsWithIndex { case (k, iter) =>
+      iter.zipWithIndex.map { case (item, i) =>
+        (item, i * n + k)
+      }
+    }
+  }
+
+  /**
    * Take the first num elements of the RDD. It works by first scanning one 
partition, and use the
    * results from that partition to estimate the number of additional 
partitions needed to satisfy
    * the limit.

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e733d655/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
new file mode 100644
index 0000000..5e08a46
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.{TaskContext, Partition}
+import org.apache.spark.util.Utils
+
+private[spark]
+class ZippedWithIndexRDDPartition(val prev: Partition, val startIndex: Long)
+  extends Partition with Serializable {
+  override val index: Int = prev.index
+}
+
+/**
+ * Represents a RDD zipped with its element indices. The ordering is first 
based on the partition
+ * index and then the ordering of items within each partition. So the first 
item in the first
+ * partition gets index 0, and the last item in the last partition receives 
the largest index.
+ *
+ * @param prev parent RDD
+ * @tparam T parent RDD item type
+ */
+private[spark]
+class ZippedWithIndexRDD[T: ClassTag](prev: RDD[T]) extends RDD[(T, 
Long)](prev) {
+
+  override def getPartitions: Array[Partition] = {
+    val n = prev.partitions.size
+    val startIndices: Array[Long] =
+      if (n == 0) {
+        Array[Long]()
+      } else if (n == 1) {
+        Array(0L)
+      } else {
+        prev.context.runJob(
+          prev,
+          Utils.getIteratorSize _,
+          0 until n - 1, // do not need to count the last partition
+          false
+        ).scanLeft(0L)(_ + _)
+      }
+    firstParent[T].partitions.map(x => new ZippedWithIndexRDDPartition(x, 
startIndices(x.index)))
+  }
+
+  override def getPreferredLocations(split: Partition): Seq[String] =
+    
firstParent[T].preferredLocations(split.asInstanceOf[ZippedWithIndexRDDPartition].prev)
+
+  override def compute(splitIn: Partition, context: TaskContext): Iterator[(T, 
Long)] = {
+    val split = splitIn.asInstanceOf[ZippedWithIndexRDDPartition]
+    firstParent[T].iterator(split.prev, context).zipWithIndex.map { x =>
+      (x._1, split.startIndex + x._2)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e733d655/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index c201d0a..8749ab7 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -855,4 +855,17 @@ private[spark] object Utils extends Logging {
     System.currentTimeMillis - start
   }
 
+  /**
+   * Counts the number of elements of an iterator using a while loop rather 
than calling
+   * [[scala.collection.Iterator#size]] because it uses a for loop, which is 
slightly slower
+   * in the current version of Scala.
+   */
+  def getIteratorSize[T](iterator: Iterator[T]): Long = {
+    var count = 0L
+    while (iterator.hasNext) {
+      count += 1L
+      iterator.next()
+    }
+    count
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e733d655/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala 
b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 879c4e5..308c7cc 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -525,4 +525,30 @@ class RDDSuite extends FunSuite with SharedSparkContext {
     assert(a.intersection(b).collect.sorted === intersection)
     assert(b.intersection(a).collect.sorted === intersection)
   }
+
+  test("zipWithIndex") {
+    val n = 10
+    val data = sc.parallelize(0 until n, 3)
+    val ranked = data.zipWithIndex()
+    ranked.collect().foreach { x =>
+      assert(x._1 === x._2)
+    }
+  }
+
+  test("zipWithIndex with a single partition") {
+    val n = 10
+    val data = sc.parallelize(0 until n, 1)
+    val ranked = data.zipWithIndex()
+    ranked.collect().foreach { x =>
+      assert(x._1 === x._2)
+    }
+  }
+
+  test("zipWithUniqueId") {
+    val n = 10
+    val data = sc.parallelize(0 until n, 3)
+    val ranked = data.zipWithUniqueId()
+    val ids = ranked.map(_._1).distinct().collect()
+    assert(ids.length === n)
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e733d655/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 4684c8c..7030ba4 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -146,5 +146,12 @@ class UtilsSuite extends FunSuite {
     assert(bbuf.array.length === 8)
     assert(Utils.deserializeLongValue(bbuf.array) === testval)
   }
+
+  test("get iterator size") {
+    val empty = Seq[Int]()
+    assert(Utils.getIteratorSize(empty.toIterator) === 0L)
+    val iterator = Iterator.range(0, 5)
+    assert(Utils.getIteratorSize(iterator) === 5L)
+  }
 }
 

Reply via email to