Repository: spark Updated Branches: refs/heads/branch-2.0 995f602d2 -> 4131623a8
[SPARK-18003][SPARK CORE] Fix bug of RDD zipWithIndex & zipWithUniqueId index value overflowing ## What changes were proposed in this pull request? - Fix bug of RDD `zipWithIndex` generating wrong result when one partition contains more than 2147483647 records. - Fix bug of RDD `zipWithUniqueId` generating wrong result when one partition contains more than 2147483647 records. ## How was this patch tested? test added. Author: WeichenXu <weichenxu...@outlook.com> Closes #15550 from WeichenXu123/fix_rdd_zipWithIndex_overflow. (cherry picked from commit 39755169fb5bb07332eef263b4c18ede1528812d) Signed-off-by: Reynold Xin <r...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4131623a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4131623a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4131623a Branch: refs/heads/branch-2.0 Commit: 4131623a8585fe99f79d82c24ab3b8b506d0d616 Parents: 995f602 Author: WeichenXu <weichenxu...@outlook.com> Authored: Wed Oct 19 23:41:38 2016 -0700 Committer: Reynold Xin <r...@databricks.com> Committed: Wed Oct 19 23:41:46 2016 -0700 ---------------------------------------------------------------------- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 2 +- .../org/apache/spark/rdd/ZippedWithIndexRDD.scala | 5 ++--- .../src/main/scala/org/apache/spark/util/Utils.scala | 15 +++++++++++++++ .../scala/org/apache/spark/util/UtilsSuite.scala | 7 +++++++ 4 files changed, 25 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/4131623a/core/src/main/scala/org/apache/spark/rdd/RDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 34d32aa..7013396 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1278,7 +1278,7 @@ abstract class RDD[T: ClassTag]( def zipWithUniqueId(): RDD[(T, Long)] = withScope { val n = this.partitions.length.toLong this.mapPartitionsWithIndex { case (k, iter) => - iter.zipWithIndex.map { case (item, i) => + Utils.getIteratorZipWithIndex(iter, 0L).map { case (item, i) => (item, i * n + k) } } http://git-wip-us.apache.org/repos/asf/spark/blob/4131623a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala index 32931d5..dff6737 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala @@ -64,8 +64,7 @@ class ZippedWithIndexRDD[T: ClassTag](prev: RDD[T]) extends RDD[(T, Long)](prev) override def compute(splitIn: Partition, context: TaskContext): Iterator[(T, Long)] = { val split = splitIn.asInstanceOf[ZippedWithIndexRDDPartition] - firstParent[T].iterator(split.prev, context).zipWithIndex.map { x => - (x._1, split.startIndex + x._2) - } + val parentIter = firstParent[T].iterator(split.prev, context) + Utils.getIteratorZipWithIndex(parentIter, split.startIndex) } } http://git-wip-us.apache.org/repos/asf/spark/blob/4131623a/core/src/main/scala/org/apache/spark/util/Utils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 3d862f4..1686edb 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1768,6 +1768,21 @@ private[spark] object Utils extends Logging { } /** + * Generate a zipWithIndex iterator, avoid index value overflowing problem + * in scala's zipWithIndex + */ + def getIteratorZipWithIndex[T](iterator: Iterator[T], startIndex: Long): Iterator[(T, Long)] = { + new Iterator[(T, Long)] { + var index: Long = startIndex - 1L + def hasNext: Boolean = iterator.hasNext + def next(): (T, Long) = { + index += 1L + (iterator.next(), index) + } + } + } + + /** * Creates a symlink. * * @param src absolute path to the source http://git-wip-us.apache.org/repos/asf/spark/blob/4131623a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 2741ad7..b67482a 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -396,6 +396,13 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { assert(Utils.getIteratorSize(iterator) === 5L) } + test("getIteratorZipWithIndex") { + val iterator = Utils.getIteratorZipWithIndex(Iterator(0, 1, 2), -1L + Int.MaxValue) + assert(iterator.toArray === Array( + (0, -1L + Int.MaxValue), (1, 0L + Int.MaxValue), (2, 1L + Int.MaxValue) + )) + } + test("doesDirectoryContainFilesNewerThan") { // create some temporary directories and files val parent: File = Utils.createTempDir() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org