Repository: spark Updated Branches: refs/heads/branch-1.6 5cc1e2cec -> 0b8bdf793
[SPARK-8428][SPARK-13850] Fix integer overflows in TimSort This patch fixes a few integer overflows in `UnsafeSortDataFormat.copyRange()` and `ShuffleSortDataFormat copyRange()` that seems to be the most likely cause behind a number of `TimSort` contract violation errors seen in Spark 2.0 and Spark 1.6 while sorting large datasets. Added a test in `ExternalSorterSuite` that instantiates a large array of the form of [150000000, 150000001, 150000002, ...., 300000000, 0, 1, 2, ..., 149999999] that triggers a `copyRange` in `TimSort.mergeLo` or `TimSort.mergeHi`. Note that the input dataset should contain at least 268.43 million rows with a certain data distribution for an overflow to occur. Author: Sameer Agarwal <sam...@databricks.com> Closes #13336 from sameeragarwal/timsort-bug. (cherry picked from commit fe6de16f781ff659b34e0ddda427d371d3d94536) Signed-off-by: Reynold Xin <r...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0b8bdf79 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0b8bdf79 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0b8bdf79 Branch: refs/heads/branch-1.6 Commit: 0b8bdf793a98296fd1ac1fc499946929c6a5959d Parents: 5cc1e2c Author: Sameer Agarwal <sam...@databricks.com> Authored: Thu May 26 15:49:16 2016 -0700 Committer: Reynold Xin <r...@databricks.com> Committed: Thu May 26 15:50:50 2016 -0700 ---------------------------------------------------------------------- .../shuffle/sort/ShuffleSortDataFormat.java | 6 ++--- .../unsafe/sort/UnsafeSortDataFormat.java | 6 ++--- .../util/collection/ExternalSorterSuite.scala | 25 +++++++++++++++++++- 3 files changed, 30 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/0b8bdf79/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java index 8f4e322..1e924d2 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java @@ -61,10 +61,10 @@ final class ShuffleSortDataFormat extends SortDataFormat<PackedRecordPointer, Lo public void copyRange(LongArray src, int srcPos, LongArray dst, int dstPos, int length) { Platform.copyMemory( src.getBaseObject(), - src.getBaseOffset() + srcPos * 8, + src.getBaseOffset() + srcPos * 8L, dst.getBaseObject(), - dst.getBaseOffset() + dstPos * 8, - length * 8 + dst.getBaseOffset() + dstPos * 8L, + length * 8L ); } http://git-wip-us.apache.org/repos/asf/spark/blob/0b8bdf79/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java index d3137f5..1eac329 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java @@ -73,10 +73,10 @@ final class UnsafeSortDataFormat extends SortDataFormat<RecordPointerAndKeyPrefi public void copyRange(LongArray src, int srcPos, LongArray dst, int dstPos, int length) { Platform.copyMemory( src.getBaseObject(), - src.getBaseOffset() + srcPos * 16, + src.getBaseOffset() + srcPos * 16L, dst.getBaseObject(), - dst.getBaseOffset() + dstPos * 16, - length * 16); + dst.getBaseOffset() + dstPos * 16L, + length * 16L); } @Override http://git-wip-us.apache.org/repos/asf/spark/blob/0b8bdf79/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala index d7b2d07..dae83fe 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala @@ -17,13 +17,17 @@ package org.apache.spark.util.collection -import org.apache.spark.memory.MemoryTestingUtils +import java.util.Comparator import scala.collection.mutable.ArrayBuffer import scala.util.Random import org.apache.spark._ +import org.apache.spark.memory.MemoryTestingUtils import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} +import org.apache.spark.unsafe.array.LongArray +import org.apache.spark.unsafe.memory.MemoryBlock +import org.apache.spark.util.collection.unsafe.sort.{PrefixComparators, RecordPointerAndKeyPrefix, UnsafeSortDataFormat} class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { @@ -95,6 +99,25 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { testWithMultipleSer("sort without breaking sorting contracts", loadDefaults = true)( sortWithoutBreakingSortingContracts) + // This test is ignored by default as it requires a fairly large heap size (16GB) + ignore("sort without breaking timsort contracts for large arrays") { + val size = 300000000 + // To manifest the bug observed in SPARK-8428 and SPARK-13850, we explicitly use an array of + // the form [150000000, 150000001, 150000002, ...., 300000000, 0, 1, 2, ..., 149999999] + // that can trigger copyRange() in TimSort.mergeLo() or TimSort.mergeHi() + val ref = Array.tabulate[Long](size) { i => if (i < size / 2) size / 2 + i else i } + val buf = new LongArray(MemoryBlock.fromLongArray(ref)) + + new Sorter(UnsafeSortDataFormat.INSTANCE).sort( + buf, 0, size, new Comparator[RecordPointerAndKeyPrefix] { + override def compare( + r1: RecordPointerAndKeyPrefix, + r2: RecordPointerAndKeyPrefix): Int = { + PrefixComparators.LONG.compare(r1.keyPrefix, r2.keyPrefix) + } + }) + } + test("spilling with hash collisions") { val size = 1000 val conf = createSparkConf(loadDefaults = true, kryo = false) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org