Repository: spark
Updated Branches:
  refs/heads/branch-1.6 5cc1e2cec -> 0b8bdf793


[SPARK-8428][SPARK-13850] Fix integer overflows in TimSort

This patch fixes a few integer overflows in `UnsafeSortDataFormat.copyRange()` 
and `ShuffleSortDataFormat copyRange()` that seems to be the most likely cause 
behind a number of `TimSort` contract violation errors seen in Spark 2.0 and 
Spark 1.6 while sorting large datasets.

Added a test in `ExternalSorterSuite` that instantiates a large array of the 
form of [150000000, 150000001, 150000002, ...., 300000000, 0, 1, 2, ..., 
149999999] that triggers a `copyRange` in `TimSort.mergeLo` or 
`TimSort.mergeHi`. Note that the input dataset should contain at least 268.43 
million rows with a certain data distribution for an overflow to occur.

Author: Sameer Agarwal <sam...@databricks.com>

Closes #13336 from sameeragarwal/timsort-bug.

(cherry picked from commit fe6de16f781ff659b34e0ddda427d371d3d94536)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0b8bdf79
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0b8bdf79
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0b8bdf79

Branch: refs/heads/branch-1.6
Commit: 0b8bdf793a98296fd1ac1fc499946929c6a5959d
Parents: 5cc1e2c
Author: Sameer Agarwal <sam...@databricks.com>
Authored: Thu May 26 15:49:16 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Thu May 26 15:50:50 2016 -0700

----------------------------------------------------------------------
 .../shuffle/sort/ShuffleSortDataFormat.java     |  6 ++---
 .../unsafe/sort/UnsafeSortDataFormat.java       |  6 ++---
 .../util/collection/ExternalSorterSuite.scala   | 25 +++++++++++++++++++-
 3 files changed, 30 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0b8bdf79/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java 
b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java
index 8f4e322..1e924d2 100644
--- 
a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java
+++ 
b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java
@@ -61,10 +61,10 @@ final class ShuffleSortDataFormat extends 
SortDataFormat<PackedRecordPointer, Lo
   public void copyRange(LongArray src, int srcPos, LongArray dst, int dstPos, 
int length) {
     Platform.copyMemory(
       src.getBaseObject(),
-      src.getBaseOffset() + srcPos * 8,
+      src.getBaseOffset() + srcPos * 8L,
       dst.getBaseObject(),
-      dst.getBaseOffset() + dstPos * 8,
-      length * 8
+      dst.getBaseOffset() + dstPos * 8L,
+      length * 8L
     );
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0b8bdf79/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java
index d3137f5..1eac329 100644
--- 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java
+++ 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java
@@ -73,10 +73,10 @@ final class UnsafeSortDataFormat extends 
SortDataFormat<RecordPointerAndKeyPrefi
   public void copyRange(LongArray src, int srcPos, LongArray dst, int dstPos, 
int length) {
     Platform.copyMemory(
       src.getBaseObject(),
-      src.getBaseOffset() + srcPos * 16,
+      src.getBaseOffset() + srcPos * 16L,
       dst.getBaseObject(),
-      dst.getBaseOffset() + dstPos * 16,
-      length * 16);
+      dst.getBaseOffset() + dstPos * 16L,
+      length * 16L);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/spark/blob/0b8bdf79/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
 
b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
index d7b2d07..dae83fe 100644
--- 
a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
@@ -17,13 +17,17 @@
 
 package org.apache.spark.util.collection
 
-import org.apache.spark.memory.MemoryTestingUtils
+import java.util.Comparator
 
 import scala.collection.mutable.ArrayBuffer
 import scala.util.Random
 
 import org.apache.spark._
+import org.apache.spark.memory.MemoryTestingUtils
 import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
+import org.apache.spark.unsafe.array.LongArray
+import org.apache.spark.unsafe.memory.MemoryBlock
+import org.apache.spark.util.collection.unsafe.sort.{PrefixComparators, 
RecordPointerAndKeyPrefix, UnsafeSortDataFormat}
 
 
 class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
@@ -95,6 +99,25 @@ class ExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext {
   testWithMultipleSer("sort without breaking sorting contracts", loadDefaults 
= true)(
     sortWithoutBreakingSortingContracts)
 
+  // This test is ignored by default as it requires a fairly large heap size 
(16GB)
+  ignore("sort without breaking timsort contracts for large arrays") {
+    val size = 300000000
+    // To manifest the bug observed in SPARK-8428 and SPARK-13850, we 
explicitly use an array of
+    // the form [150000000, 150000001, 150000002, ...., 300000000, 0, 1, 2, 
..., 149999999]
+    // that can trigger copyRange() in TimSort.mergeLo() or TimSort.mergeHi()
+    val ref = Array.tabulate[Long](size) { i => if (i < size / 2) size / 2 + i 
else i }
+    val buf = new LongArray(MemoryBlock.fromLongArray(ref))
+
+    new Sorter(UnsafeSortDataFormat.INSTANCE).sort(
+      buf, 0, size, new Comparator[RecordPointerAndKeyPrefix] {
+        override def compare(
+            r1: RecordPointerAndKeyPrefix,
+            r2: RecordPointerAndKeyPrefix): Int = {
+          PrefixComparators.LONG.compare(r1.keyPrefix, r2.keyPrefix)
+        }
+      })
+  }
+
   test("spilling with hash collisions") {
     val size = 1000
     val conf = createSparkConf(loadDefaults = true, kryo = false)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to