Repository: spark Updated Branches: refs/heads/master 2d000124b -> 3b364ff0a
[SPARK-11078] Ensure spilling tests actually spill #9084 uncovered that many tests that test spilling don't actually spill. This is a follow-up patch to fix that to ensure our unit tests actually catch potential bugs in spilling. The size of this patch is inflated by the refactoring of `ExternalSorterSuite`, which had a lot of duplicate code and logic. Author: Andrew Or <and...@databricks.com> Closes #9124 from andrewor14/spilling-tests. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3b364ff0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3b364ff0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3b364ff0 Branch: refs/heads/master Commit: 3b364ff0a4f38c2b8023429a55623de32be5f329 Parents: 2d00012 Author: Andrew Or <and...@databricks.com> Authored: Thu Oct 15 14:50:01 2015 -0700 Committer: Andrew Or <and...@databricks.com> Committed: Thu Oct 15 14:50:01 2015 -0700 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/TestUtils.scala | 51 ++ .../spark/shuffle/ShuffleMemoryManager.scala | 6 +- .../util/collection/ExternalAppendOnlyMap.scala | 6 + .../spark/util/collection/Spillable.scala | 37 +- .../org/apache/spark/DistributedSuite.scala | 39 +- .../collection/ExternalAppendOnlyMapSuite.scala | 103 ++- .../util/collection/ExternalSorterSuite.scala | 871 ++++++++----------- .../execution/TestShuffleMemoryManager.scala | 2 + 8 files changed, 534 insertions(+), 581 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/3b364ff0/core/src/main/scala/org/apache/spark/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala index 888763a..acfe751 100644 --- a/core/src/main/scala/org/apache/spark/TestUtils.scala +++ b/core/src/main/scala/org/apache/spark/TestUtils.scala @@ -24,10 +24,14 @@ import java.util.Arrays import java.util.jar.{JarEntry, JarOutputStream} import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer import com.google.common.io.{ByteStreams, Files} import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider} +import org.apache.spark.executor.TaskMetrics +import org.apache.spark.scheduler._ import org.apache.spark.util.Utils /** @@ -154,4 +158,51 @@ private[spark] object TestUtils { " @Override public String toString() { return \"" + toStringValue + "\"; }}") createCompiledClass(className, destDir, sourceFile, classpathUrls) } + + /** + * Run some code involving jobs submitted to the given context and assert that the jobs spilled. + */ + def assertSpilled[T](sc: SparkContext, identifier: String)(body: => T): Unit = { + val spillListener = new SpillListener + sc.addSparkListener(spillListener) + body + assert(spillListener.numSpilledStages > 0, s"expected $identifier to spill, but did not") + } + + /** + * Run some code involving jobs submitted to the given context and assert that the jobs + * did not spill. + */ + def assertNotSpilled[T](sc: SparkContext, identifier: String)(body: => T): Unit = { + val spillListener = new SpillListener + sc.addSparkListener(spillListener) + body + assert(spillListener.numSpilledStages == 0, s"expected $identifier to not spill, but did") + } + +} + + +/** + * A [[SparkListener]] that detects whether spills have occurred in Spark jobs. + */ +private class SpillListener extends SparkListener { + private val stageIdToTaskMetrics = new mutable.HashMap[Int, ArrayBuffer[TaskMetrics]] + private val spilledStageIds = new mutable.HashSet[Int] + + def numSpilledStages: Int = spilledStageIds.size + + override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { + stageIdToTaskMetrics.getOrElseUpdate( + taskEnd.stageId, new ArrayBuffer[TaskMetrics]) += taskEnd.taskMetrics + } + + override def onStageCompleted(stageComplete: SparkListenerStageCompleted): Unit = { + val stageId = stageComplete.stageInfo.stageId + val metrics = stageIdToTaskMetrics.remove(stageId).toSeq.flatten + val spilled = metrics.map(_.memoryBytesSpilled).sum > 0 + if (spilled) { + spilledStageIds += stageId + } + } } http://git-wip-us.apache.org/repos/asf/spark/blob/3b364ff0/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala index aaf543c..9bd18da 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala @@ -139,8 +139,10 @@ class ShuffleMemoryManager protected ( throw new SparkException( s"Internal error: release called on $numBytes bytes but task only has $curMem") } - taskMemory(taskAttemptId) -= numBytes - memoryManager.releaseExecutionMemory(numBytes) + if (taskMemory.contains(taskAttemptId)) { + taskMemory(taskAttemptId) -= numBytes + memoryManager.releaseExecutionMemory(numBytes) + } memoryManager.notifyAll() // Notify waiters in tryToAcquire that memory has been freed } http://git-wip-us.apache.org/repos/asf/spark/blob/3b364ff0/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 6a96b5d..cfa58f5 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -96,6 +96,12 @@ class ExternalAppendOnlyMap[K, V, C]( private val ser = serializer.newInstance() /** + * Number of files this map has spilled so far. + * Exposed for testing. + */ + private[collection] def numSpills: Int = spilledMaps.size + + /** * Insert the given key and value into the map. */ def insert(key: K, value: V): Unit = { http://git-wip-us.apache.org/repos/asf/spark/blob/3b364ff0/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala index 747ecf0..d2a68ca 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala @@ -43,10 +43,15 @@ private[spark] trait Spillable[C] extends Logging { private[this] val shuffleMemoryManager = SparkEnv.get.shuffleMemoryManager // Initial threshold for the size of a collection before we start tracking its memory usage - // Exposed for testing + // For testing only private[this] val initialMemoryThreshold: Long = SparkEnv.get.conf.getLong("spark.shuffle.spill.initialMemoryThreshold", 5 * 1024 * 1024) + // Force this collection to spill when there are this many elements in memory + // For testing only + private[this] val numElementsForceSpillThreshold: Long = + SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", Long.MaxValue) + // Threshold for this collection's size in bytes before we start tracking its memory usage // To avoid a large number of small spills, initialize this to a value orders of magnitude > 0 private[this] var myMemoryThreshold = initialMemoryThreshold @@ -69,27 +74,27 @@ private[spark] trait Spillable[C] extends Logging { * @return true if `collection` was spilled to disk; false otherwise */ protected def maybeSpill(collection: C, currentMemory: Long): Boolean = { + var shouldSpill = false if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) { // Claim up to double our current memory from the shuffle memory pool val amountToRequest = 2 * currentMemory - myMemoryThreshold val granted = shuffleMemoryManager.tryToAcquire(amountToRequest) myMemoryThreshold += granted - if (myMemoryThreshold <= currentMemory) { - // We were granted too little memory to grow further (either tryToAcquire returned 0, - // or we already had more memory than myMemoryThreshold); spill the current collection - _spillCount += 1 - logSpillage(currentMemory) - - spill(collection) - - _elementsRead = 0 - // Keep track of spills, and release memory - _memoryBytesSpilled += currentMemory - releaseMemoryForThisThread() - return true - } + // If we were granted too little memory to grow further (either tryToAcquire returned 0, + // or we already had more memory than myMemoryThreshold), spill the current collection + shouldSpill = currentMemory >= myMemoryThreshold + } + shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold + // Actually spill + if (shouldSpill) { + _spillCount += 1 + logSpillage(currentMemory) + spill(collection) + _elementsRead = 0 + _memoryBytesSpilled += currentMemory + releaseMemoryForThisThread() } - false + shouldSpill } /** http://git-wip-us.apache.org/repos/asf/spark/blob/3b364ff0/core/src/test/scala/org/apache/spark/DistributedSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala index 34a4bb9..1c3f2bc 100644 --- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala +++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala @@ -203,22 +203,35 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex } test("compute without caching when no partitions fit in memory") { - sc = new SparkContext(clusterUrl, "test") - // data will be 4 million * 4 bytes = 16 MB in size, but our memoryFraction set the cache - // to only 50 KB (0.0001 of 512 MB), so no partitions should fit in memory - val data = sc.parallelize(1 to 4000000, 2).persist(StorageLevel.MEMORY_ONLY_SER) - assert(data.count() === 4000000) - assert(data.count() === 4000000) - assert(data.count() === 4000000) + val size = 10000 + val conf = new SparkConf() + .set("spark.storage.unrollMemoryThreshold", "1024") + .set("spark.testing.memory", (size / 2).toString) + sc = new SparkContext(clusterUrl, "test", conf) + val data = sc.parallelize(1 to size, 2).persist(StorageLevel.MEMORY_ONLY) + assert(data.count() === size) + assert(data.count() === size) + assert(data.count() === size) + // ensure only a subset of partitions were cached + val rddBlocks = sc.env.blockManager.master.getMatchingBlockIds(_.isRDD, askSlaves = true) + assert(rddBlocks.size === 0, s"expected no RDD blocks, found ${rddBlocks.size}") } test("compute when only some partitions fit in memory") { - sc = new SparkContext(clusterUrl, "test", new SparkConf) - // TODO: verify that only a subset of partitions fit in memory (SPARK-11078) - val data = sc.parallelize(1 to 4000000, 20).persist(StorageLevel.MEMORY_ONLY_SER) - assert(data.count() === 4000000) - assert(data.count() === 4000000) - assert(data.count() === 4000000) + val size = 10000 + val numPartitions = 10 + val conf = new SparkConf() + .set("spark.storage.unrollMemoryThreshold", "1024") + .set("spark.testing.memory", (size * numPartitions).toString) + sc = new SparkContext(clusterUrl, "test", conf) + val data = sc.parallelize(1 to size, numPartitions).persist(StorageLevel.MEMORY_ONLY) + assert(data.count() === size) + assert(data.count() === size) + assert(data.count() === size) + // ensure only a subset of partitions were cached + val rddBlocks = sc.env.blockManager.master.getMatchingBlockIds(_.isRDD, askSlaves = true) + assert(rddBlocks.size > 0, "no RDD blocks found") + assert(rddBlocks.size < numPartitions, s"too many RDD blocks found, expected <$numPartitions") } test("passing environment variables to cluster") { http://git-wip-us.apache.org/repos/asf/spark/blob/3b364ff0/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala index 0a03c32..5cb506e 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala @@ -22,9 +22,10 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark._ import org.apache.spark.io.CompressionCodec -// TODO: some of these spilling tests probably aren't actually spilling (SPARK-11078) class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { + import TestUtils.{assertNotSpilled, assertSpilled} + private val allCompressionCodecs = CompressionCodec.ALL_COMPRESSION_CODECS private def createCombiner[T](i: T) = ArrayBuffer[T](i) private def mergeValue[T](buffer: ArrayBuffer[T], i: T): ArrayBuffer[T] = buffer += i @@ -244,54 +245,53 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { * If a compression codec is provided, use it. Otherwise, do not compress spills. */ private def testSimpleSpilling(codec: Option[String] = None): Unit = { + val size = 1000 val conf = createSparkConf(loadDefaults = true, codec) // Load defaults for Spark home + conf.set("spark.shuffle.manager", "hash") // avoid using external sorter + conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 4).toString) sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) - // reduceByKey - should spill ~8 times - val rddA = sc.parallelize(0 until 100000).map(i => (i/2, i)) - val resultA = rddA.reduceByKey(math.max).collect() - assert(resultA.length === 50000) - resultA.foreach { case (k, v) => - assert(v === k * 2 + 1, s"Value for $k was wrong: expected ${k * 2 + 1}, got $v") + assertSpilled(sc, "reduceByKey") { + val result = sc.parallelize(0 until size) + .map { i => (i / 2, i) }.reduceByKey(math.max).collect() + assert(result.length === size / 2) + result.foreach { case (k, v) => + val expected = k * 2 + 1 + assert(v === expected, s"Value for $k was wrong: expected $expected, got $v") + } } - // groupByKey - should spill ~17 times - val rddB = sc.parallelize(0 until 100000).map(i => (i/4, i)) - val resultB = rddB.groupByKey().collect() - assert(resultB.length === 25000) - resultB.foreach { case (i, seq) => - val expected = Set(i * 4, i * 4 + 1, i * 4 + 2, i * 4 + 3) - assert(seq.toSet === expected, - s"Value for $i was wrong: expected $expected, got ${seq.toSet}") + assertSpilled(sc, "groupByKey") { + val result = sc.parallelize(0 until size).map { i => (i / 2, i) }.groupByKey().collect() + assert(result.length == size / 2) + result.foreach { case (i, seq) => + val actual = seq.toSet + val expected = Set(i * 2, i * 2 + 1) + assert(actual === expected, s"Value for $i was wrong: expected $expected, got $actual") + } } - // cogroup - should spill ~7 times - val rddC1 = sc.parallelize(0 until 10000).map(i => (i, i)) - val rddC2 = sc.parallelize(0 until 10000).map(i => (i%1000, i)) - val resultC = rddC1.cogroup(rddC2).collect() - assert(resultC.length === 10000) - resultC.foreach { case (i, (seq1, seq2)) => - i match { - case 0 => - assert(seq1.toSet === Set[Int](0)) - assert(seq2.toSet === Set[Int](0, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000)) - case 1 => - assert(seq1.toSet === Set[Int](1)) - assert(seq2.toSet === Set[Int](1, 1001, 2001, 3001, 4001, 5001, 6001, 7001, 8001, 9001)) - case 5000 => - assert(seq1.toSet === Set[Int](5000)) - assert(seq2.toSet === Set[Int]()) - case 9999 => - assert(seq1.toSet === Set[Int](9999)) - assert(seq2.toSet === Set[Int]()) - case _ => + assertSpilled(sc, "cogroup") { + val rdd1 = sc.parallelize(0 until size).map { i => (i / 2, i) } + val rdd2 = sc.parallelize(0 until size).map { i => (i / 2, i) } + val result = rdd1.cogroup(rdd2).collect() + assert(result.length === size / 2) + result.foreach { case (i, (seq1, seq2)) => + val actual1 = seq1.toSet + val actual2 = seq2.toSet + val expected = Set(i * 2, i * 2 + 1) + assert(actual1 === expected, s"Value 1 for $i was wrong: expected $expected, got $actual1") + assert(actual2 === expected, s"Value 2 for $i was wrong: expected $expected, got $actual2") } } + sc.stop() } test("spilling with hash collisions") { + val size = 1000 val conf = createSparkConf(loadDefaults = true) + conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 2).toString) sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) val map = createExternalMap[String] @@ -315,11 +315,12 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { assert(w1.hashCode === w2.hashCode) } - map.insertAll((1 to 100000).iterator.map(_.toString).map(i => (i, i))) + map.insertAll((1 to size).iterator.map(_.toString).map(i => (i, i))) collisionPairs.foreach { case (w1, w2) => map.insert(w1, w2) map.insert(w2, w1) } + assert(map.numSpills > 0, "map did not spill") // A map of collision pairs in both directions val collisionPairsMap = (collisionPairs ++ collisionPairs.map(_.swap)).toMap @@ -334,22 +335,25 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { assert(kv._2.equals(expectedValue)) count += 1 } - assert(count === 100000 + collisionPairs.size * 2) + assert(count === size + collisionPairs.size * 2) sc.stop() } test("spilling with many hash collisions") { + val size = 1000 val conf = createSparkConf(loadDefaults = true) + conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 2).toString) sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) val map = new ExternalAppendOnlyMap[FixedHashObject, Int, Int](_ => 1, _ + _, _ + _) // Insert 10 copies each of lots of objects whose hash codes are either 0 or 1. This causes // problems if the map fails to group together the objects with the same code (SPARK-2043). for (i <- 1 to 10) { - for (j <- 1 to 10000) { + for (j <- 1 to size) { map.insert(FixedHashObject(j, j % 2), 1) } } + assert(map.numSpills > 0, "map did not spill") val it = map.iterator var count = 0 @@ -358,17 +362,20 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { assert(kv._2 === 10) count += 1 } - assert(count === 10000) + assert(count === size) sc.stop() } test("spilling with hash collisions using the Int.MaxValue key") { + val size = 1000 val conf = createSparkConf(loadDefaults = true) + conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 2).toString) sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) val map = createExternalMap[Int] - (1 to 100000).foreach { i => map.insert(i, i) } + (1 to size).foreach { i => map.insert(i, i) } map.insert(Int.MaxValue, Int.MaxValue) + assert(map.numSpills > 0, "map did not spill") val it = map.iterator while (it.hasNext) { @@ -379,14 +386,17 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { } test("spilling with null keys and values") { + val size = 1000 val conf = createSparkConf(loadDefaults = true) + conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 2).toString) sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) val map = createExternalMap[Int] - map.insertAll((1 to 100000).iterator.map(i => (i, i))) + map.insertAll((1 to size).iterator.map(i => (i, i))) map.insert(null.asInstanceOf[Int], 1) map.insert(1, null.asInstanceOf[Int]) map.insert(null.asInstanceOf[Int], null.asInstanceOf[Int]) + assert(map.numSpills > 0, "map did not spill") val it = map.iterator while (it.hasNext) { @@ -397,17 +407,22 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { } test("external aggregation updates peak execution memory") { + val spillThreshold = 1000 val conf = createSparkConf(loadDefaults = false) .set("spark.shuffle.manager", "hash") // make sure we're not also using ExternalSorter - .set("spark.testing.memory", (10 * 1024 * 1024).toString) + .set("spark.shuffle.spill.numElementsForceSpillThreshold", spillThreshold.toString) sc = new SparkContext("local", "test", conf) // No spilling AccumulatorSuite.verifyPeakExecutionMemorySet(sc, "external map without spilling") { - sc.parallelize(1 to 10, 2).map { i => (i, i) }.reduceByKey(_ + _).count() + assertNotSpilled(sc, "verify peak memory") { + sc.parallelize(1 to spillThreshold / 2, 2).map { i => (i, i) }.reduceByKey(_ + _).count() + } } // With spilling AccumulatorSuite.verifyPeakExecutionMemorySet(sc, "external map with spilling") { - sc.parallelize(1 to 1000 * 1000, 2).map { i => (i, i) }.reduceByKey(_ + _).count() + assertSpilled(sc, "verify peak memory") { + sc.parallelize(1 to spillThreshold * 3, 2).map { i => (i, i) }.reduceByKey(_ + _).count() + } } } http://git-wip-us.apache.org/repos/asf/spark/blob/3b364ff0/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala index 651c7ea..e2cb791 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala @@ -18,535 +18,92 @@ package org.apache.spark.util.collection import scala.collection.mutable.ArrayBuffer - import scala.util.Random import org.apache.spark._ import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} -// TODO: some of these spilling tests probably aren't actually spilling (SPARK-11078) class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { - private def createSparkConf(loadDefaults: Boolean, kryo: Boolean): SparkConf = { - val conf = new SparkConf(loadDefaults) - if (kryo) { - conf.set("spark.serializer", classOf[KryoSerializer].getName) - } else { - // Make the Java serializer write a reset instruction (TC_RESET) after each object to test - // for a bug we had with bytes written past the last object in a batch (SPARK-2792) - conf.set("spark.serializer.objectStreamReset", "1") - conf.set("spark.serializer", classOf[JavaSerializer].getName) - } - conf.set("spark.shuffle.sort.bypassMergeThreshold", "0") - // Ensure that we actually have multiple batches per spill file - conf.set("spark.shuffle.spill.batchSize", "10") - conf.set("spark.testing.memory", "2000000") - conf - } - - test("empty data stream with kryo ser") { - emptyDataStream(createSparkConf(false, true)) - } - - test("empty data stream with java ser") { - emptyDataStream(createSparkConf(false, false)) - } - - def emptyDataStream(conf: SparkConf) { - conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") - sc = new SparkContext("local", "test", conf) - - val agg = new Aggregator[Int, Int, Int](i => i, (i, j) => i + j, (i, j) => i + j) - val ord = implicitly[Ordering[Int]] - - // Both aggregator and ordering - val sorter = new ExternalSorter[Int, Int, Int]( - Some(agg), Some(new HashPartitioner(3)), Some(ord), None) - assert(sorter.iterator.toSeq === Seq()) - sorter.stop() - - // Only aggregator - val sorter2 = new ExternalSorter[Int, Int, Int]( - Some(agg), Some(new HashPartitioner(3)), None, None) - assert(sorter2.iterator.toSeq === Seq()) - sorter2.stop() - - // Only ordering - val sorter3 = new ExternalSorter[Int, Int, Int]( - None, Some(new HashPartitioner(3)), Some(ord), None) - assert(sorter3.iterator.toSeq === Seq()) - sorter3.stop() - - // Neither aggregator nor ordering - val sorter4 = new ExternalSorter[Int, Int, Int]( - None, Some(new HashPartitioner(3)), None, None) - assert(sorter4.iterator.toSeq === Seq()) - sorter4.stop() - } + import TestUtils.{assertNotSpilled, assertSpilled} - test("few elements per partition with kryo ser") { - fewElementsPerPartition(createSparkConf(false, true)) - } + testWithMultipleSer("empty data stream")(emptyDataStream) - test("few elements per partition with java ser") { - fewElementsPerPartition(createSparkConf(false, false)) - } + testWithMultipleSer("few elements per partition")(fewElementsPerPartition) - def fewElementsPerPartition(conf: SparkConf) { - conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") - sc = new SparkContext("local", "test", conf) - - val agg = new Aggregator[Int, Int, Int](i => i, (i, j) => i + j, (i, j) => i + j) - val ord = implicitly[Ordering[Int]] - val elements = Set((1, 1), (2, 2), (5, 5)) - val expected = Set( - (0, Set()), (1, Set((1, 1))), (2, Set((2, 2))), (3, Set()), (4, Set()), - (5, Set((5, 5))), (6, Set())) - - // Both aggregator and ordering - val sorter = new ExternalSorter[Int, Int, Int]( - Some(agg), Some(new HashPartitioner(7)), Some(ord), None) - sorter.insertAll(elements.iterator) - assert(sorter.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected) - sorter.stop() - - // Only aggregator - val sorter2 = new ExternalSorter[Int, Int, Int]( - Some(agg), Some(new HashPartitioner(7)), None, None) - sorter2.insertAll(elements.iterator) - assert(sorter2.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected) - sorter2.stop() + testWithMultipleSer("empty partitions with spilling")(emptyPartitionsWithSpilling) - // Only ordering - val sorter3 = new ExternalSorter[Int, Int, Int]( - None, Some(new HashPartitioner(7)), Some(ord), None) - sorter3.insertAll(elements.iterator) - assert(sorter3.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected) - sorter3.stop() - - // Neither aggregator nor ordering - val sorter4 = new ExternalSorter[Int, Int, Int]( - None, Some(new HashPartitioner(7)), None, None) - sorter4.insertAll(elements.iterator) - assert(sorter4.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected) - sorter4.stop() - } - - test("empty partitions with spilling with kryo ser") { - emptyPartitionsWithSpilling(createSparkConf(false, true)) + // Load defaults, otherwise SPARK_HOME is not found + testWithMultipleSer("spilling in local cluster", loadDefaults = true) { + (conf: SparkConf) => testSpillingInLocalCluster(conf, 2) } - test("empty partitions with spilling with java ser") { - emptyPartitionsWithSpilling(createSparkConf(false, false)) - } - - def emptyPartitionsWithSpilling(conf: SparkConf) { - conf.set("spark.shuffle.spill.initialMemoryThreshold", "512") - conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") - sc = new SparkContext("local", "test", conf) - - val ord = implicitly[Ordering[Int]] - val elements = Iterator((1, 1), (5, 5)) ++ (0 until 100000).iterator.map(x => (2, 2)) - - val sorter = new ExternalSorter[Int, Int, Int]( - None, Some(new HashPartitioner(7)), Some(ord), None) - sorter.insertAll(elements) - assert(sc.env.blockManager.diskBlockManager.getAllFiles().length > 0) // Make sure it spilled - val iter = sorter.partitionedIterator.map(p => (p._1, p._2.toList)) - assert(iter.next() === (0, Nil)) - assert(iter.next() === (1, List((1, 1)))) - assert(iter.next() === (2, (0 until 100000).map(x => (2, 2)).toList)) - assert(iter.next() === (3, Nil)) - assert(iter.next() === (4, Nil)) - assert(iter.next() === (5, List((5, 5)))) - assert(iter.next() === (6, Nil)) - sorter.stop() - } - - test("spilling in local cluster with kryo ser") { - // Load defaults, otherwise SPARK_HOME is not found - testSpillingInLocalCluster(createSparkConf(true, true)) - } - - test("spilling in local cluster with java ser") { - // Load defaults, otherwise SPARK_HOME is not found - testSpillingInLocalCluster(createSparkConf(true, false)) - } - - def testSpillingInLocalCluster(conf: SparkConf) { - conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") - sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) - - // reduceByKey - should spill ~8 times - val rddA = sc.parallelize(0 until 100000).map(i => (i/2, i)) - val resultA = rddA.reduceByKey(math.max).collect() - assert(resultA.length == 50000) - resultA.foreach { case(k, v) => - if (v != k * 2 + 1) { - fail(s"Value for ${k} was wrong: expected ${k * 2 + 1}, got ${v}") - } - } - - // groupByKey - should spill ~17 times - val rddB = sc.parallelize(0 until 100000).map(i => (i/4, i)) - val resultB = rddB.groupByKey().collect() - assert(resultB.length == 25000) - resultB.foreach { case(i, seq) => - val expected = Set(i * 4, i * 4 + 1, i * 4 + 2, i * 4 + 3) - if (seq.toSet != expected) { - fail(s"Value for ${i} was wrong: expected ${expected}, got ${seq.toSet}") - } - } - - // cogroup - should spill ~7 times - val rddC1 = sc.parallelize(0 until 10000).map(i => (i, i)) - val rddC2 = sc.parallelize(0 until 10000).map(i => (i%1000, i)) - val resultC = rddC1.cogroup(rddC2).collect() - assert(resultC.length == 10000) - resultC.foreach { case(i, (seq1, seq2)) => - i match { - case 0 => - assert(seq1.toSet == Set[Int](0)) - assert(seq2.toSet == Set[Int](0, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000)) - case 1 => - assert(seq1.toSet == Set[Int](1)) - assert(seq2.toSet == Set[Int](1, 1001, 2001, 3001, 4001, 5001, 6001, 7001, 8001, 9001)) - case 5000 => - assert(seq1.toSet == Set[Int](5000)) - assert(seq2.toSet == Set[Int]()) - case 9999 => - assert(seq1.toSet == Set[Int](9999)) - assert(seq2.toSet == Set[Int]()) - case _ => - } - } - - // larger cogroup - should spill ~7 times - val rddD1 = sc.parallelize(0 until 10000).map(i => (i/2, i)) - val rddD2 = sc.parallelize(0 until 10000).map(i => (i/2, i)) - val resultD = rddD1.cogroup(rddD2).collect() - assert(resultD.length == 5000) - resultD.foreach { case(i, (seq1, seq2)) => - val expected = Set(i * 2, i * 2 + 1) - if (seq1.toSet != expected) { - fail(s"Value 1 for ${i} was wrong: expected ${expected}, got ${seq1.toSet}") - } - if (seq2.toSet != expected) { - fail(s"Value 2 for ${i} was wrong: expected ${expected}, got ${seq2.toSet}") - } - } - - // sortByKey - should spill ~17 times - val rddE = sc.parallelize(0 until 100000).map(i => (i/4, i)) - val resultE = rddE.sortByKey().collect().toSeq - assert(resultE === (0 until 100000).map(i => (i/4, i)).toSeq) - } - - test("spilling in local cluster with many reduce tasks with kryo ser") { - spillingInLocalClusterWithManyReduceTasks(createSparkConf(true, true)) - } - - test("spilling in local cluster with many reduce tasks with java ser") { - spillingInLocalClusterWithManyReduceTasks(createSparkConf(true, false)) - } - - def spillingInLocalClusterWithManyReduceTasks(conf: SparkConf) { - conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") - sc = new SparkContext("local-cluster[2,1,1024]", "test", conf) - - // reduceByKey - should spill ~4 times per executor - val rddA = sc.parallelize(0 until 100000).map(i => (i/2, i)) - val resultA = rddA.reduceByKey(math.max _, 100).collect() - assert(resultA.length == 50000) - resultA.foreach { case(k, v) => - if (v != k * 2 + 1) { - fail(s"Value for ${k} was wrong: expected ${k * 2 + 1}, got ${v}") - } - } - - // groupByKey - should spill ~8 times per executor - val rddB = sc.parallelize(0 until 100000).map(i => (i/4, i)) - val resultB = rddB.groupByKey(100).collect() - assert(resultB.length == 25000) - resultB.foreach { case(i, seq) => - val expected = Set(i * 4, i * 4 + 1, i * 4 + 2, i * 4 + 3) - if (seq.toSet != expected) { - fail(s"Value for ${i} was wrong: expected ${expected}, got ${seq.toSet}") - } - } - - // cogroup - should spill ~4 times per executor - val rddC1 = sc.parallelize(0 until 10000).map(i => (i, i)) - val rddC2 = sc.parallelize(0 until 10000).map(i => (i%1000, i)) - val resultC = rddC1.cogroup(rddC2, 100).collect() - assert(resultC.length == 10000) - resultC.foreach { case(i, (seq1, seq2)) => - i match { - case 0 => - assert(seq1.toSet == Set[Int](0)) - assert(seq2.toSet == Set[Int](0, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000)) - case 1 => - assert(seq1.toSet == Set[Int](1)) - assert(seq2.toSet == Set[Int](1, 1001, 2001, 3001, 4001, 5001, 6001, 7001, 8001, 9001)) - case 5000 => - assert(seq1.toSet == Set[Int](5000)) - assert(seq2.toSet == Set[Int]()) - case 9999 => - assert(seq1.toSet == Set[Int](9999)) - assert(seq2.toSet == Set[Int]()) - case _ => - } - } - - // larger cogroup - should spill ~4 times per executor - val rddD1 = sc.parallelize(0 until 10000).map(i => (i/2, i)) - val rddD2 = sc.parallelize(0 until 10000).map(i => (i/2, i)) - val resultD = rddD1.cogroup(rddD2).collect() - assert(resultD.length == 5000) - resultD.foreach { case(i, (seq1, seq2)) => - val expected = Set(i * 2, i * 2 + 1) - if (seq1.toSet != expected) { - fail(s"Value 1 for ${i} was wrong: expected ${expected}, got ${seq1.toSet}") - } - if (seq2.toSet != expected) { - fail(s"Value 2 for ${i} was wrong: expected ${expected}, got ${seq2.toSet}") - } - } - - // sortByKey - should spill ~8 times per executor - val rddE = sc.parallelize(0 until 100000).map(i => (i/4, i)) - val resultE = rddE.sortByKey().collect().toSeq - assert(resultE === (0 until 100000).map(i => (i/4, i)).toSeq) + testWithMultipleSer("spilling in local cluster with many reduce tasks", loadDefaults = true) { + (conf: SparkConf) => testSpillingInLocalCluster(conf, 100) } test("cleanup of intermediate files in sorter") { - val conf = createSparkConf(true, false) // Load defaults, otherwise SPARK_HOME is not found - conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") - sc = new SparkContext("local", "test", conf) - val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager - - val ord = implicitly[Ordering[Int]] - - val sorter = new ExternalSorter[Int, Int, Int]( - None, Some(new HashPartitioner(3)), Some(ord), None) - sorter.insertAll((0 until 120000).iterator.map(i => (i, i))) - assert(diskBlockManager.getAllFiles().length > 0) - sorter.stop() - assert(diskBlockManager.getAllBlocks().length === 0) - - val sorter2 = new ExternalSorter[Int, Int, Int]( - None, Some(new HashPartitioner(3)), Some(ord), None) - sorter2.insertAll((0 until 120000).iterator.map(i => (i, i))) - assert(diskBlockManager.getAllFiles().length > 0) - assert(sorter2.iterator.toSet === (0 until 120000).map(i => (i, i)).toSet) - sorter2.stop() - assert(diskBlockManager.getAllBlocks().length === 0) + cleanupIntermediateFilesInSorter(withFailures = false) } - test("cleanup of intermediate files in sorter if there are errors") { - val conf = createSparkConf(true, false) // Load defaults, otherwise SPARK_HOME is not found - conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") - sc = new SparkContext("local", "test", conf) - val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager - - val ord = implicitly[Ordering[Int]] - - val sorter = new ExternalSorter[Int, Int, Int]( - None, Some(new HashPartitioner(3)), Some(ord), None) - intercept[SparkException] { - sorter.insertAll((0 until 120000).iterator.map(i => { - if (i == 119990) { - throw new SparkException("Intentional failure") - } - (i, i) - })) - } - assert(diskBlockManager.getAllFiles().length > 0) - sorter.stop() - assert(diskBlockManager.getAllBlocks().length === 0) + test("cleanup of intermediate files in sorter with failures") { + cleanupIntermediateFilesInSorter(withFailures = true) } test("cleanup of intermediate files in shuffle") { - val conf = createSparkConf(false, false) - conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") - sc = new SparkContext("local", "test", conf) - val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager - - val data = sc.parallelize(0 until 100000, 2).map(i => (i, i)) - assert(data.reduceByKey(_ + _).count() === 100000) - - // After the shuffle, there should be only 4 files on disk: our two map output files and - // their index files. All other intermediate files should've been deleted. - assert(diskBlockManager.getAllFiles().length === 4) - } - - test("cleanup of intermediate files in shuffle with errors") { - val conf = createSparkConf(false, false) - conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") - sc = new SparkContext("local", "test", conf) - val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager - - val data = sc.parallelize(0 until 100000, 2).map(i => { - if (i == 99990) { - throw new Exception("Intentional failure") - } - (i, i) - }) - intercept[SparkException] { - data.reduceByKey(_ + _).count() - } - - // After the shuffle, there should be only 2 files on disk: the output of task 1 and its index. - // All other files (map 2's output and intermediate merge files) should've been deleted. - assert(diskBlockManager.getAllFiles().length === 2) - } - - test("no partial aggregation or sorting with kryo ser") { - noPartialAggregationOrSorting(createSparkConf(false, true)) - } - - test("no partial aggregation or sorting with java ser") { - noPartialAggregationOrSorting(createSparkConf(false, false)) - } - - def noPartialAggregationOrSorting(conf: SparkConf) { - conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") - sc = new SparkContext("local", "test", conf) - - val sorter = new ExternalSorter[Int, Int, Int](None, Some(new HashPartitioner(3)), None, None) - sorter.insertAll((0 until 100000).iterator.map(i => (i / 4, i))) - val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSet)}.toSet - val expected = (0 until 3).map(p => { - (p, (0 until 100000).map(i => (i / 4, i)).filter(_._1 % 3 == p).toSet) - }).toSet - assert(results === expected) - } - - test("partial aggregation without spill with kryo ser") { - partialAggregationWithoutSpill(createSparkConf(false, true)) - } - - test("partial aggregation without spill with java ser") { - partialAggregationWithoutSpill(createSparkConf(false, false)) - } - - def partialAggregationWithoutSpill(conf: SparkConf) { - conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") - sc = new SparkContext("local", "test", conf) - - val agg = new Aggregator[Int, Int, Int](i => i, (i, j) => i + j, (i, j) => i + j) - val sorter = new ExternalSorter(Some(agg), Some(new HashPartitioner(3)), None, None) - sorter.insertAll((0 until 100).iterator.map(i => (i / 2, i))) - val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSet)}.toSet - val expected = (0 until 3).map(p => { - (p, (0 until 50).map(i => (i, i * 4 + 1)).filter(_._1 % 3 == p).toSet) - }).toSet - assert(results === expected) + cleanupIntermediateFilesInShuffle(withFailures = false) } - test("partial aggregation with spill, no ordering with kryo ser") { - partialAggregationWIthSpillNoOrdering(createSparkConf(false, true)) + test("cleanup of intermediate files in shuffle with failures") { + cleanupIntermediateFilesInShuffle(withFailures = true) } - test("partial aggregation with spill, no ordering with java ser") { - partialAggregationWIthSpillNoOrdering(createSparkConf(false, false)) + testWithMultipleSer("no sorting or partial aggregation") { (conf: SparkConf) => + basicSorterTest(conf, withPartialAgg = false, withOrdering = false, withSpilling = false) } - def partialAggregationWIthSpillNoOrdering(conf: SparkConf) { - conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") - sc = new SparkContext("local", "test", conf) - - val agg = new Aggregator[Int, Int, Int](i => i, (i, j) => i + j, (i, j) => i + j) - val sorter = new ExternalSorter(Some(agg), Some(new HashPartitioner(3)), None, None) - sorter.insertAll((0 until 100000).iterator.map(i => (i / 2, i))) - val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSet)}.toSet - val expected = (0 until 3).map(p => { - (p, (0 until 50000).map(i => (i, i * 4 + 1)).filter(_._1 % 3 == p).toSet) - }).toSet - assert(results === expected) + testWithMultipleSer("no sorting or partial aggregation with spilling") { (conf: SparkConf) => + basicSorterTest(conf, withPartialAgg = false, withOrdering = false, withSpilling = true) } - test("partial aggregation with spill, with ordering with kryo ser") { - partialAggregationWithSpillWithOrdering(createSparkConf(false, true)) + testWithMultipleSer("sorting, no partial aggregation") { (conf: SparkConf) => + basicSorterTest(conf, withPartialAgg = false, withOrdering = true, withSpilling = false) } - - test("partial aggregation with spill, with ordering with java ser") { - partialAggregationWithSpillWithOrdering(createSparkConf(false, false)) + testWithMultipleSer("sorting, no partial aggregation with spilling") { (conf: SparkConf) => + basicSorterTest(conf, withPartialAgg = false, withOrdering = true, withSpilling = true) } - def partialAggregationWithSpillWithOrdering(conf: SparkConf) { - conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") - sc = new SparkContext("local", "test", conf) - - val agg = new Aggregator[Int, Int, Int](i => i, (i, j) => i + j, (i, j) => i + j) - val ord = implicitly[Ordering[Int]] - val sorter = new ExternalSorter(Some(agg), Some(new HashPartitioner(3)), Some(ord), None) - - // avoid combine before spill - sorter.insertAll((0 until 50000).iterator.map(i => (i , 2 * i))) - sorter.insertAll((0 until 50000).iterator.map(i => (i, 2 * i + 1))) - val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSet)}.toSet - val expected = (0 until 3).map(p => { - (p, (0 until 50000).map(i => (i, i * 4 + 1)).filter(_._1 % 3 == p).toSet) - }).toSet - assert(results === expected) + testWithMultipleSer("partial aggregation, no sorting") { (conf: SparkConf) => + basicSorterTest(conf, withPartialAgg = true, withOrdering = false, withSpilling = false) } - test("sorting without aggregation, no spill with kryo ser") { - sortingWithoutAggregationNoSpill(createSparkConf(false, true)) + testWithMultipleSer("partial aggregation, no sorting with spilling") { (conf: SparkConf) => + basicSorterTest(conf, withPartialAgg = true, withOrdering = false, withSpilling = true) } - test("sorting without aggregation, no spill with java ser") { - sortingWithoutAggregationNoSpill(createSparkConf(false, false)) + testWithMultipleSer("partial aggregation and sorting") { (conf: SparkConf) => + basicSorterTest(conf, withPartialAgg = true, withOrdering = true, withSpilling = false) } - def sortingWithoutAggregationNoSpill(conf: SparkConf) { - conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") - sc = new SparkContext("local", "test", conf) - - val ord = implicitly[Ordering[Int]] - val sorter = new ExternalSorter[Int, Int, Int]( - None, Some(new HashPartitioner(3)), Some(ord), None) - sorter.insertAll((0 until 100).iterator.map(i => (i, i))) - val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSeq)}.toSeq - val expected = (0 until 3).map(p => { - (p, (0 until 100).map(i => (i, i)).filter(_._1 % 3 == p).toSeq) - }).toSeq - assert(results === expected) - } - - test("sorting without aggregation, with spill with kryo ser") { - sortingWithoutAggregationWithSpill(createSparkConf(false, true)) - } - - test("sorting without aggregation, with spill with java ser") { - sortingWithoutAggregationWithSpill(createSparkConf(false, false)) + testWithMultipleSer("partial aggregation and sorting with spilling") { (conf: SparkConf) => + basicSorterTest(conf, withPartialAgg = true, withOrdering = true, withSpilling = true) } - def sortingWithoutAggregationWithSpill(conf: SparkConf) { - conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") - sc = new SparkContext("local", "test", conf) - - val ord = implicitly[Ordering[Int]] - val sorter = new ExternalSorter[Int, Int, Int]( - None, Some(new HashPartitioner(3)), Some(ord), None) - sorter.insertAll((0 until 100000).iterator.map(i => (i, i))) - val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSeq)}.toSeq - val expected = (0 until 3).map(p => { - (p, (0 until 100000).map(i => (i, i)).filter(_._1 % 3 == p).toSeq) - }).toSeq - assert(results === expected) - } + testWithMultipleSer("sort without breaking sorting contracts", loadDefaults = true)( + sortWithoutBreakingSortingContracts) test("spilling with hash collisions") { - val conf = createSparkConf(true, false) + val size = 1000 + val conf = createSparkConf(loadDefaults = true, kryo = false) + conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 2).toString) sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) def createCombiner(i: String): ArrayBuffer[String] = ArrayBuffer[String](i) def mergeValue(buffer: ArrayBuffer[String], i: String): ArrayBuffer[String] = buffer += i - def mergeCombiners(buffer1: ArrayBuffer[String], buffer2: ArrayBuffer[String]) - : ArrayBuffer[String] = buffer1 ++= buffer2 + def mergeCombiners( + buffer1: ArrayBuffer[String], + buffer2: ArrayBuffer[String]): ArrayBuffer[String] = buffer1 ++= buffer2 val agg = new Aggregator[String, String, ArrayBuffer[String]]( createCombiner _, mergeValue _, mergeCombiners _) @@ -574,10 +131,11 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { assert(w1.hashCode === w2.hashCode) } - val toInsert = (1 to 100000).iterator.map(_.toString).map(s => (s, s)) ++ + val toInsert = (1 to size).iterator.map(_.toString).map(s => (s, s)) ++ collisionPairs.iterator ++ collisionPairs.iterator.map(_.swap) sorter.insertAll(toInsert) + assert(sorter.numSpills > 0, "sorter did not spill") // A map of collision pairs in both directions val collisionPairsMap = (collisionPairs ++ collisionPairs.map(_.swap)).toMap @@ -592,21 +150,21 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { assert(kv._2.equals(expectedValue)) count += 1 } - assert(count === 100000 + collisionPairs.size * 2) + assert(count === size + collisionPairs.size * 2) } test("spilling with many hash collisions") { - val conf = createSparkConf(true, false) + val size = 1000 + val conf = createSparkConf(loadDefaults = true, kryo = false) + conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 2).toString) sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) - val agg = new Aggregator[FixedHashObject, Int, Int](_ => 1, _ + _, _ + _) val sorter = new ExternalSorter[FixedHashObject, Int, Int](Some(agg), None, None, None) - // Insert 10 copies each of lots of objects whose hash codes are either 0 or 1. This causes // problems if the map fails to group together the objects with the same code (SPARK-2043). - val toInsert = for (i <- 1 to 10; j <- 1 to 10000) yield (FixedHashObject(j, j % 2), 1) + val toInsert = for (i <- 1 to 10; j <- 1 to size) yield (FixedHashObject(j, j % 2), 1) sorter.insertAll(toInsert.iterator) - + assert(sorter.numSpills > 0, "sorter did not spill") val it = sorter.iterator var count = 0 while (it.hasNext) { @@ -614,11 +172,13 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { assert(kv._2 === 10) count += 1 } - assert(count === 10000) + assert(count === size) } test("spilling with hash collisions using the Int.MaxValue key") { - val conf = createSparkConf(true, false) + val size = 1000 + val conf = createSparkConf(loadDefaults = true, kryo = false) + conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 2).toString) sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) def createCombiner(i: Int): ArrayBuffer[Int] = ArrayBuffer[Int](i) @@ -629,10 +189,9 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { val agg = new Aggregator[Int, Int, ArrayBuffer[Int]](createCombiner, mergeValue, mergeCombiners) val sorter = new ExternalSorter[Int, Int, ArrayBuffer[Int]](Some(agg), None, None, None) - sorter.insertAll( - (1 to 100000).iterator.map(i => (i, i)) ++ Iterator((Int.MaxValue, Int.MaxValue))) - + (1 to size).iterator.map(i => (i, i)) ++ Iterator((Int.MaxValue, Int.MaxValue))) + assert(sorter.numSpills > 0, "sorter did not spill") val it = sorter.iterator while (it.hasNext) { // Should not throw NoSuchElementException @@ -641,7 +200,9 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { } test("spilling with null keys and values") { - val conf = createSparkConf(true, false) + val size = 1000 + val conf = createSparkConf(loadDefaults = true, kryo = false) + conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 2).toString) sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) def createCombiner(i: String): ArrayBuffer[String] = ArrayBuffer[String](i) @@ -655,12 +216,12 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { val sorter = new ExternalSorter[String, String, ArrayBuffer[String]]( Some(agg), None, None, None) - sorter.insertAll((1 to 100000).iterator.map(i => (i.toString, i.toString)) ++ Iterator( + sorter.insertAll((1 to size).iterator.map(i => (i.toString, i.toString)) ++ Iterator( (null.asInstanceOf[String], "1"), ("1", null.asInstanceOf[String]), (null.asInstanceOf[String], null.asInstanceOf[String]) )) - + assert(sorter.numSpills > 0, "sorter did not spill") val it = sorter.iterator while (it.hasNext) { // Should not throw NullPointerException @@ -668,16 +229,301 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { } } - test("sort without breaking sorting contracts with kryo ser") { - sortWithoutBreakingSortingContracts(createSparkConf(true, true)) + /* ============================= * + | Helper test utility methods | + * ============================= */ + + private def createSparkConf(loadDefaults: Boolean, kryo: Boolean): SparkConf = { + val conf = new SparkConf(loadDefaults) + if (kryo) { + conf.set("spark.serializer", classOf[KryoSerializer].getName) + } else { + // Make the Java serializer write a reset instruction (TC_RESET) after each object to test + // for a bug we had with bytes written past the last object in a batch (SPARK-2792) + conf.set("spark.serializer.objectStreamReset", "1") + conf.set("spark.serializer", classOf[JavaSerializer].getName) + } + conf.set("spark.shuffle.sort.bypassMergeThreshold", "0") + // Ensure that we actually have multiple batches per spill file + conf.set("spark.shuffle.spill.batchSize", "10") + conf.set("spark.shuffle.spill.initialMemoryThreshold", "512") + conf + } + + /** + * Run a test multiple times, each time with a different serializer. + */ + private def testWithMultipleSer( + name: String, + loadDefaults: Boolean = false)(body: (SparkConf => Unit)): Unit = { + test(name + " with kryo ser") { + body(createSparkConf(loadDefaults, kryo = true)) + } + test(name + " with java ser") { + body(createSparkConf(loadDefaults, kryo = false)) + } } - test("sort without breaking sorting contracts with java ser") { - sortWithoutBreakingSortingContracts(createSparkConf(true, false)) + /* =========================================== * + | Helper methods that contain the test body | + * =========================================== */ + + private def emptyDataStream(conf: SparkConf) { + conf.set("spark.shuffle.manager", "sort") + sc = new SparkContext("local", "test", conf) + + val agg = new Aggregator[Int, Int, Int](i => i, (i, j) => i + j, (i, j) => i + j) + val ord = implicitly[Ordering[Int]] + + // Both aggregator and ordering + val sorter = new ExternalSorter[Int, Int, Int]( + Some(agg), Some(new HashPartitioner(3)), Some(ord), None) + assert(sorter.iterator.toSeq === Seq()) + sorter.stop() + + // Only aggregator + val sorter2 = new ExternalSorter[Int, Int, Int]( + Some(agg), Some(new HashPartitioner(3)), None, None) + assert(sorter2.iterator.toSeq === Seq()) + sorter2.stop() + + // Only ordering + val sorter3 = new ExternalSorter[Int, Int, Int]( + None, Some(new HashPartitioner(3)), Some(ord), None) + assert(sorter3.iterator.toSeq === Seq()) + sorter3.stop() + + // Neither aggregator nor ordering + val sorter4 = new ExternalSorter[Int, Int, Int]( + None, Some(new HashPartitioner(3)), None, None) + assert(sorter4.iterator.toSeq === Seq()) + sorter4.stop() + } + + private def fewElementsPerPartition(conf: SparkConf) { + conf.set("spark.shuffle.manager", "sort") + sc = new SparkContext("local", "test", conf) + + val agg = new Aggregator[Int, Int, Int](i => i, (i, j) => i + j, (i, j) => i + j) + val ord = implicitly[Ordering[Int]] + val elements = Set((1, 1), (2, 2), (5, 5)) + val expected = Set( + (0, Set()), (1, Set((1, 1))), (2, Set((2, 2))), (3, Set()), (4, Set()), + (5, Set((5, 5))), (6, Set())) + + // Both aggregator and ordering + val sorter = new ExternalSorter[Int, Int, Int]( + Some(agg), Some(new HashPartitioner(7)), Some(ord), None) + sorter.insertAll(elements.iterator) + assert(sorter.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected) + sorter.stop() + + // Only aggregator + val sorter2 = new ExternalSorter[Int, Int, Int]( + Some(agg), Some(new HashPartitioner(7)), None, None) + sorter2.insertAll(elements.iterator) + assert(sorter2.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected) + sorter2.stop() + + // Only ordering + val sorter3 = new ExternalSorter[Int, Int, Int]( + None, Some(new HashPartitioner(7)), Some(ord), None) + sorter3.insertAll(elements.iterator) + assert(sorter3.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected) + sorter3.stop() + + // Neither aggregator nor ordering + val sorter4 = new ExternalSorter[Int, Int, Int]( + None, Some(new HashPartitioner(7)), None, None) + sorter4.insertAll(elements.iterator) + assert(sorter4.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected) + sorter4.stop() + } + + private def emptyPartitionsWithSpilling(conf: SparkConf) { + val size = 1000 + conf.set("spark.shuffle.manager", "sort") + conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 2).toString) + sc = new SparkContext("local", "test", conf) + + val ord = implicitly[Ordering[Int]] + val elements = Iterator((1, 1), (5, 5)) ++ (0 until size).iterator.map(x => (2, 2)) + + val sorter = new ExternalSorter[Int, Int, Int]( + None, Some(new HashPartitioner(7)), Some(ord), None) + sorter.insertAll(elements) + assert(sorter.numSpills > 0, "sorter did not spill") + val iter = sorter.partitionedIterator.map(p => (p._1, p._2.toList)) + assert(iter.next() === (0, Nil)) + assert(iter.next() === (1, List((1, 1)))) + assert(iter.next() === (2, (0 until 1000).map(x => (2, 2)).toList)) + assert(iter.next() === (3, Nil)) + assert(iter.next() === (4, Nil)) + assert(iter.next() === (5, List((5, 5)))) + assert(iter.next() === (6, Nil)) + sorter.stop() + } + + private def testSpillingInLocalCluster(conf: SparkConf, numReduceTasks: Int) { + val size = 5000 + conf.set("spark.shuffle.manager", "sort") + conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 4).toString) + sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) + + assertSpilled(sc, "reduceByKey") { + val result = sc.parallelize(0 until size) + .map { i => (i / 2, i) } + .reduceByKey(math.max _, numReduceTasks) + .collect() + assert(result.length === size / 2) + result.foreach { case (k, v) => + val expected = k * 2 + 1 + assert(v === expected, s"Value for $k was wrong: expected $expected, got $v") + } + } + + assertSpilled(sc, "groupByKey") { + val result = sc.parallelize(0 until size) + .map { i => (i / 2, i) } + .groupByKey(numReduceTasks) + .collect() + assert(result.length == size / 2) + result.foreach { case (i, seq) => + val actual = seq.toSet + val expected = Set(i * 2, i * 2 + 1) + assert(actual === expected, s"Value for $i was wrong: expected $expected, got $actual") + } + } + + assertSpilled(sc, "cogroup") { + val rdd1 = sc.parallelize(0 until size).map { i => (i / 2, i) } + val rdd2 = sc.parallelize(0 until size).map { i => (i / 2, i) } + val result = rdd1.cogroup(rdd2, numReduceTasks).collect() + assert(result.length === size / 2) + result.foreach { case (i, (seq1, seq2)) => + val actual1 = seq1.toSet + val actual2 = seq2.toSet + val expected = Set(i * 2, i * 2 + 1) + assert(actual1 === expected, s"Value 1 for $i was wrong: expected $expected, got $actual1") + assert(actual2 === expected, s"Value 2 for $i was wrong: expected $expected, got $actual2") + } + } + + assertSpilled(sc, "sortByKey") { + val result = sc.parallelize(0 until size) + .map { i => (i / 2, i) } + .sortByKey(numPartitions = numReduceTasks) + .collect() + val expected = (0 until size).map { i => (i / 2, i) }.toArray + assert(result.length === size) + result.zipWithIndex.foreach { case ((k, _), i) => + val (expectedKey, _) = expected(i) + assert(k === expectedKey, s"Value for $i was wrong: expected $expectedKey, got $k") + } + } + } + + private def cleanupIntermediateFilesInSorter(withFailures: Boolean): Unit = { + val size = 1200 + val conf = createSparkConf(loadDefaults = false, kryo = false) + conf.set("spark.shuffle.manager", "sort") + conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 4).toString) + sc = new SparkContext("local", "test", conf) + val diskBlockManager = sc.env.blockManager.diskBlockManager + val ord = implicitly[Ordering[Int]] + val expectedSize = if (withFailures) size - 1 else size + val sorter = new ExternalSorter[Int, Int, Int]( + None, Some(new HashPartitioner(3)), Some(ord), None) + if (withFailures) { + intercept[SparkException] { + sorter.insertAll((0 until size).iterator.map { i => + if (i == size - 1) { throw new SparkException("intentional failure") } + (i, i) + }) + } + } else { + sorter.insertAll((0 until size).iterator.map(i => (i, i))) + } + assert(sorter.iterator.toSet === (0 until expectedSize).map(i => (i, i)).toSet) + assert(sorter.numSpills > 0, "sorter did not spill") + assert(diskBlockManager.getAllFiles().nonEmpty, "sorter did not spill") + sorter.stop() + assert(diskBlockManager.getAllFiles().isEmpty, "spilled files were not cleaned up") + } + + private def cleanupIntermediateFilesInShuffle(withFailures: Boolean): Unit = { + val size = 1200 + val conf = createSparkConf(loadDefaults = false, kryo = false) + conf.set("spark.shuffle.manager", "sort") + conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 4).toString) + sc = new SparkContext("local", "test", conf) + val diskBlockManager = sc.env.blockManager.diskBlockManager + val data = sc.parallelize(0 until size, 2).map { i => + if (withFailures && i == size - 1) { + throw new SparkException("intentional failure") + } + (i, i) + } + + assertSpilled(sc, "test shuffle cleanup") { + if (withFailures) { + intercept[SparkException] { + data.reduceByKey(_ + _).count() + } + // After the shuffle, there should be only 2 files on disk: the output of task 1 and + // its index. All other files (map 2's output and intermediate merge files) should + // have been deleted. + assert(diskBlockManager.getAllFiles().length === 2) + } else { + assert(data.reduceByKey(_ + _).count() === size) + // After the shuffle, there should be only 4 files on disk: the output of both tasks + // and their indices. All intermediate merge files should have been deleted. + assert(diskBlockManager.getAllFiles().length === 4) + } + } + } + + private def basicSorterTest( + conf: SparkConf, + withPartialAgg: Boolean, + withOrdering: Boolean, + withSpilling: Boolean) { + val size = 1000 + if (withSpilling) { + conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 2).toString) + } + conf.set("spark.shuffle.manager", "sort") + sc = new SparkContext("local", "test", conf) + val agg = + if (withPartialAgg) { + Some(new Aggregator[Int, Int, Int](i => i, (i, j) => i + j, (i, j) => i + j)) + } else { + None + } + val ord = if (withOrdering) Some(implicitly[Ordering[Int]]) else None + val sorter = new ExternalSorter[Int, Int, Int](agg, Some(new HashPartitioner(3)), ord, None) + sorter.insertAll((0 until size).iterator.map { i => (i / 4, i) }) + if (withSpilling) { + assert(sorter.numSpills > 0, "sorter did not spill") + } else { + assert(sorter.numSpills === 0, "sorter spilled") + } + val results = sorter.partitionedIterator.map { case (p, vs) => (p, vs.toSet) }.toSet + val expected = (0 until 3).map { p => + var v = (0 until size).map { i => (i / 4, i) }.filter { case (k, _) => k % 3 == p }.toSet + if (withPartialAgg) { + v = v.groupBy(_._1).mapValues { s => s.map(_._2).sum }.toSet + } + (p, v.toSet) + }.toSet + assert(results === expected) } private def sortWithoutBreakingSortingContracts(conf: SparkConf) { + val size = 100000 + val conf = createSparkConf(loadDefaults = true, kryo = false) conf.set("spark.shuffle.manager", "sort") + conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 2).toString) sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) // Using wrongOrdering to show integer overflow introduced exception. @@ -690,17 +536,18 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { } } - val testData = Array.tabulate(100000) { _ => rand.nextInt().toString } + val testData = Array.tabulate(size) { _ => rand.nextInt().toString } val sorter1 = new ExternalSorter[String, String, String]( None, None, Some(wrongOrdering), None) val thrown = intercept[IllegalArgumentException] { sorter1.insertAll(testData.iterator.map(i => (i, i))) + assert(sorter1.numSpills > 0, "sorter did not spill") sorter1.iterator } - assert(thrown.getClass() === classOf[IllegalArgumentException]) - assert(thrown.getMessage().contains("Comparison method violates its general contract")) + assert(thrown.getClass === classOf[IllegalArgumentException]) + assert(thrown.getMessage.contains("Comparison method violates its general contract")) sorter1.stop() // Using aggregation and external spill to make sure ExternalSorter using @@ -716,6 +563,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { val sorter2 = new ExternalSorter[String, String, ArrayBuffer[String]]( Some(agg), None, None, None) sorter2.insertAll(testData.iterator.map(i => (i, i))) + assert(sorter2.numSpills > 0, "sorter did not spill") // To validate the hash ordering of key var minKey = Int.MinValue @@ -729,12 +577,23 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { } test("sorting updates peak execution memory") { + val spillThreshold = 1000 val conf = createSparkConf(loadDefaults = false, kryo = false) .set("spark.shuffle.manager", "sort") + .set("spark.shuffle.spill.numElementsForceSpillThreshold", spillThreshold.toString) sc = new SparkContext("local", "test", conf) // Avoid aggregating here to make sure we're not also using ExternalAppendOnlyMap - AccumulatorSuite.verifyPeakExecutionMemorySet(sc, "external sorter") { - sc.parallelize(1 to 1000, 2).repartition(100).count() + // No spilling + AccumulatorSuite.verifyPeakExecutionMemorySet(sc, "external sorter without spilling") { + assertNotSpilled(sc, "verify peak memory") { + sc.parallelize(1 to spillThreshold / 2, 2).repartition(100).count() + } + } + // With spilling + AccumulatorSuite.verifyPeakExecutionMemorySet(sc, "external sorter with spilling") { + assertSpilled(sc, "verify peak memory") { + sc.parallelize(1 to spillThreshold * 3, 2).repartition(100).count() + } } } } http://git-wip-us.apache.org/repos/asf/spark/blob/3b364ff0/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala index 835f52f..c4358f4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala @@ -68,6 +68,8 @@ private class GrantEverythingMemoryManager extends MemoryManager { blockId: BlockId, numBytes: Long, evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = true + override def releaseExecutionMemory(numBytes: Long): Unit = { } + override def releaseStorageMemory(numBytes: Long): Unit = { } override def maxExecutionMemory: Long = Long.MaxValue override def maxStorageMemory: Long = Long.MaxValue } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org