Repository: spark Updated Branches: refs/heads/master 94e2064fa -> f6d06adf0
http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala index 341f56d..b92a302 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala @@ -33,7 +33,8 @@ import org.scalatest.BeforeAndAfterEach import org.apache.spark._ import org.apache.spark.executor.{TaskMetrics, ShuffleWriteMetrics} -import org.apache.spark.serializer.{SerializerInstance, Serializer, JavaSerializer} +import org.apache.spark.shuffle.IndexShuffleBlockResolver +import org.apache.spark.serializer.{JavaSerializer, SerializerInstance} import org.apache.spark.storage._ import org.apache.spark.util.Utils @@ -42,25 +43,31 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte @Mock(answer = RETURNS_SMART_NULLS) private var blockManager: BlockManager = _ @Mock(answer = RETURNS_SMART_NULLS) private var diskBlockManager: DiskBlockManager = _ @Mock(answer = RETURNS_SMART_NULLS) private var taskContext: TaskContext = _ + @Mock(answer = RETURNS_SMART_NULLS) private var blockResolver: IndexShuffleBlockResolver = _ + @Mock(answer = RETURNS_SMART_NULLS) private var dependency: ShuffleDependency[Int, Int, Int] = _ private var taskMetrics: TaskMetrics = _ - private var shuffleWriteMetrics: ShuffleWriteMetrics = _ private var tempDir: File = _ private var outputFile: File = _ private val conf: SparkConf = new SparkConf(loadDefaults = false) private val temporaryFilesCreated: mutable.Buffer[File] = new ArrayBuffer[File]() private val blockIdToFileMap: mutable.Map[BlockId, File] = new mutable.HashMap[BlockId, File] - private val shuffleBlockId: ShuffleBlockId = new ShuffleBlockId(0, 0, 0) - private val serializer: Serializer = new JavaSerializer(conf) + private var shuffleHandle: BypassMergeSortShuffleHandle[Int, Int] = _ override def beforeEach(): Unit = { tempDir = Utils.createTempDir() outputFile = File.createTempFile("shuffle", null, tempDir) - shuffleWriteMetrics = new ShuffleWriteMetrics taskMetrics = new TaskMetrics - taskMetrics.shuffleWriteMetrics = Some(shuffleWriteMetrics) MockitoAnnotations.initMocks(this) + shuffleHandle = new BypassMergeSortShuffleHandle[Int, Int]( + shuffleId = 0, + numMaps = 2, + dependency = dependency + ) + when(dependency.partitioner).thenReturn(new HashPartitioner(7)) + when(dependency.serializer).thenReturn(Some(new JavaSerializer(conf))) when(taskContext.taskMetrics()).thenReturn(taskMetrics) + when(blockResolver.getDataFile(0, 0)).thenReturn(outputFile) when(blockManager.diskBlockManager).thenReturn(diskBlockManager) when(blockManager.getDiskWriter( any[BlockId], @@ -107,18 +114,20 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte test("write empty iterator") { val writer = new BypassMergeSortShuffleWriter[Int, Int]( - new SparkConf(loadDefaults = false), blockManager, - new HashPartitioner(7), - shuffleWriteMetrics, - serializer + blockResolver, + shuffleHandle, + 0, // MapId + taskContext, + conf ) - writer.insertAll(Iterator.empty) - val partitionLengths = writer.writePartitionedFile(shuffleBlockId, taskContext, outputFile) - assert(partitionLengths.sum === 0) + writer.write(Iterator.empty) + writer.stop( /* success = */ true) + assert(writer.getPartitionLengths.sum === 0) assert(outputFile.exists()) assert(outputFile.length() === 0) assert(temporaryFilesCreated.isEmpty) + val shuffleWriteMetrics = taskContext.taskMetrics().shuffleWriteMetrics.get assert(shuffleWriteMetrics.shuffleBytesWritten === 0) assert(shuffleWriteMetrics.shuffleRecordsWritten === 0) assert(taskMetrics.diskBytesSpilled === 0) @@ -129,17 +138,19 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte def records: Iterator[(Int, Int)] = Iterator((1, 1), (5, 5)) ++ (0 until 100000).iterator.map(x => (2, 2)) val writer = new BypassMergeSortShuffleWriter[Int, Int]( - new SparkConf(loadDefaults = false), blockManager, - new HashPartitioner(7), - shuffleWriteMetrics, - serializer + blockResolver, + shuffleHandle, + 0, // MapId + taskContext, + conf ) - writer.insertAll(records) + writer.write(records) + writer.stop( /* success = */ true) assert(temporaryFilesCreated.nonEmpty) - val partitionLengths = writer.writePartitionedFile(shuffleBlockId, taskContext, outputFile) - assert(partitionLengths.sum === outputFile.length()) + assert(writer.getPartitionLengths.sum === outputFile.length()) assert(temporaryFilesCreated.count(_.exists()) === 0) // check that temporary files were deleted + val shuffleWriteMetrics = taskContext.taskMetrics().shuffleWriteMetrics.get assert(shuffleWriteMetrics.shuffleBytesWritten === outputFile.length()) assert(shuffleWriteMetrics.shuffleRecordsWritten === records.length) assert(taskMetrics.diskBytesSpilled === 0) @@ -148,14 +159,15 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte test("cleanup of intermediate files after errors") { val writer = new BypassMergeSortShuffleWriter[Int, Int]( - new SparkConf(loadDefaults = false), blockManager, - new HashPartitioner(7), - shuffleWriteMetrics, - serializer + blockResolver, + shuffleHandle, + 0, // MapId + taskContext, + conf ) intercept[SparkException] { - writer.insertAll((0 until 100000).iterator.map(i => { + writer.write((0 until 100000).iterator.map(i => { if (i == 99990) { throw new SparkException("Intentional failure") } @@ -163,7 +175,7 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte })) } assert(temporaryFilesCreated.nonEmpty) - writer.stop() + writer.stop( /* success = */ false) assert(temporaryFilesCreated.count(_.exists()) === 0) } http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleManagerSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleManagerSuite.scala new file mode 100644 index 0000000..8744a07 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleManagerSuite.scala @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.sort + +import org.mockito.Mockito._ +import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer +import org.scalatest.Matchers + +import org.apache.spark._ +import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, Serializer} + +/** + * Tests for the fallback logic in UnsafeShuffleManager. Actual tests of shuffling data are + * performed in other suites. + */ +class SortShuffleManagerSuite extends SparkFunSuite with Matchers { + + import SortShuffleManager.canUseSerializedShuffle + + private class RuntimeExceptionAnswer extends Answer[Object] { + override def answer(invocation: InvocationOnMock): Object = { + throw new RuntimeException("Called non-stubbed method, " + invocation.getMethod.getName) + } + } + + private def shuffleDep( + partitioner: Partitioner, + serializer: Option[Serializer], + keyOrdering: Option[Ordering[Any]], + aggregator: Option[Aggregator[Any, Any, Any]], + mapSideCombine: Boolean): ShuffleDependency[Any, Any, Any] = { + val dep = mock(classOf[ShuffleDependency[Any, Any, Any]], new RuntimeExceptionAnswer()) + doReturn(0).when(dep).shuffleId + doReturn(partitioner).when(dep).partitioner + doReturn(serializer).when(dep).serializer + doReturn(keyOrdering).when(dep).keyOrdering + doReturn(aggregator).when(dep).aggregator + doReturn(mapSideCombine).when(dep).mapSideCombine + dep + } + + test("supported shuffle dependencies for serialized shuffle") { + val kryo = Some(new KryoSerializer(new SparkConf())) + + assert(canUseSerializedShuffle(shuffleDep( + partitioner = new HashPartitioner(2), + serializer = kryo, + keyOrdering = None, + aggregator = None, + mapSideCombine = false + ))) + + val rangePartitioner = mock(classOf[RangePartitioner[Any, Any]]) + when(rangePartitioner.numPartitions).thenReturn(2) + assert(canUseSerializedShuffle(shuffleDep( + partitioner = rangePartitioner, + serializer = kryo, + keyOrdering = None, + aggregator = None, + mapSideCombine = false + ))) + + // Shuffles with key orderings are supported as long as no aggregator is specified + assert(canUseSerializedShuffle(shuffleDep( + partitioner = new HashPartitioner(2), + serializer = kryo, + keyOrdering = Some(mock(classOf[Ordering[Any]])), + aggregator = None, + mapSideCombine = false + ))) + + } + + test("unsupported shuffle dependencies for serialized shuffle") { + val kryo = Some(new KryoSerializer(new SparkConf())) + val java = Some(new JavaSerializer(new SparkConf())) + + // We only support serializers that support object relocation + assert(!canUseSerializedShuffle(shuffleDep( + partitioner = new HashPartitioner(2), + serializer = java, + keyOrdering = None, + aggregator = None, + mapSideCombine = false + ))) + + // The serialized shuffle path do not support shuffles with more than 16 million output + // partitions, due to a limitation in its sorter implementation. + assert(!canUseSerializedShuffle(shuffleDep( + partitioner = new HashPartitioner( + SortShuffleManager.MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE + 1), + serializer = kryo, + keyOrdering = None, + aggregator = None, + mapSideCombine = false + ))) + + // We do not support shuffles that perform aggregation + assert(!canUseSerializedShuffle(shuffleDep( + partitioner = new HashPartitioner(2), + serializer = kryo, + keyOrdering = None, + aggregator = Some(mock(classOf[Aggregator[Any, Any, Any]])), + mapSideCombine = false + ))) + assert(!canUseSerializedShuffle(shuffleDep( + partitioner = new HashPartitioner(2), + serializer = kryo, + keyOrdering = Some(mock(classOf[Ordering[Any]])), + aggregator = Some(mock(classOf[Aggregator[Any, Any, Any]])), + mapSideCombine = true + ))) + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterSuite.scala deleted file mode 100644 index 34b4984..0000000 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterSuite.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.shuffle.sort - -import org.mockito.Mockito._ - -import org.apache.spark.{Aggregator, SparkConf, SparkFunSuite} - -class SortShuffleWriterSuite extends SparkFunSuite { - - import SortShuffleWriter._ - - test("conditions for bypassing merge-sort") { - val conf = new SparkConf(loadDefaults = false) - val agg = mock(classOf[Aggregator[_, _, _]], RETURNS_SMART_NULLS) - val ord = implicitly[Ordering[Int]] - - // Numbers of partitions that are above and below the default bypassMergeThreshold - val FEW_PARTITIONS = 50 - val MANY_PARTITIONS = 10000 - - // Shuffles with no ordering or aggregator: should bypass unless # of partitions is high - assert(shouldBypassMergeSort(conf, FEW_PARTITIONS, None, None)) - assert(!shouldBypassMergeSort(conf, MANY_PARTITIONS, None, None)) - - // Shuffles with an ordering or aggregator: should not bypass even if they have few partitions - assert(!shouldBypassMergeSort(conf, FEW_PARTITIONS, None, Some(ord))) - assert(!shouldBypassMergeSort(conf, FEW_PARTITIONS, Some(agg), None)) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManagerSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManagerSuite.scala deleted file mode 100644 index 6727934..0000000 --- a/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManagerSuite.scala +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.shuffle.unsafe - -import org.mockito.Mockito._ -import org.mockito.invocation.InvocationOnMock -import org.mockito.stubbing.Answer -import org.scalatest.Matchers - -import org.apache.spark._ -import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, Serializer} - -/** - * Tests for the fallback logic in UnsafeShuffleManager. Actual tests of shuffling data are - * performed in other suites. - */ -class UnsafeShuffleManagerSuite extends SparkFunSuite with Matchers { - - import UnsafeShuffleManager.canUseUnsafeShuffle - - private class RuntimeExceptionAnswer extends Answer[Object] { - override def answer(invocation: InvocationOnMock): Object = { - throw new RuntimeException("Called non-stubbed method, " + invocation.getMethod.getName) - } - } - - private def shuffleDep( - partitioner: Partitioner, - serializer: Option[Serializer], - keyOrdering: Option[Ordering[Any]], - aggregator: Option[Aggregator[Any, Any, Any]], - mapSideCombine: Boolean): ShuffleDependency[Any, Any, Any] = { - val dep = mock(classOf[ShuffleDependency[Any, Any, Any]], new RuntimeExceptionAnswer()) - doReturn(0).when(dep).shuffleId - doReturn(partitioner).when(dep).partitioner - doReturn(serializer).when(dep).serializer - doReturn(keyOrdering).when(dep).keyOrdering - doReturn(aggregator).when(dep).aggregator - doReturn(mapSideCombine).when(dep).mapSideCombine - dep - } - - test("supported shuffle dependencies") { - val kryo = Some(new KryoSerializer(new SparkConf())) - - assert(canUseUnsafeShuffle(shuffleDep( - partitioner = new HashPartitioner(2), - serializer = kryo, - keyOrdering = None, - aggregator = None, - mapSideCombine = false - ))) - - val rangePartitioner = mock(classOf[RangePartitioner[Any, Any]]) - when(rangePartitioner.numPartitions).thenReturn(2) - assert(canUseUnsafeShuffle(shuffleDep( - partitioner = rangePartitioner, - serializer = kryo, - keyOrdering = None, - aggregator = None, - mapSideCombine = false - ))) - - // Shuffles with key orderings are supported as long as no aggregator is specified - assert(canUseUnsafeShuffle(shuffleDep( - partitioner = new HashPartitioner(2), - serializer = kryo, - keyOrdering = Some(mock(classOf[Ordering[Any]])), - aggregator = None, - mapSideCombine = false - ))) - - } - - test("unsupported shuffle dependencies") { - val kryo = Some(new KryoSerializer(new SparkConf())) - val java = Some(new JavaSerializer(new SparkConf())) - - // We only support serializers that support object relocation - assert(!canUseUnsafeShuffle(shuffleDep( - partitioner = new HashPartitioner(2), - serializer = java, - keyOrdering = None, - aggregator = None, - mapSideCombine = false - ))) - - // We do not support shuffles with more than 16 million output partitions - assert(!canUseUnsafeShuffle(shuffleDep( - partitioner = new HashPartitioner(UnsafeShuffleManager.MAX_SHUFFLE_OUTPUT_PARTITIONS + 1), - serializer = kryo, - keyOrdering = None, - aggregator = None, - mapSideCombine = false - ))) - - // We do not support shuffles that perform aggregation - assert(!canUseUnsafeShuffle(shuffleDep( - partitioner = new HashPartitioner(2), - serializer = kryo, - keyOrdering = None, - aggregator = Some(mock(classOf[Aggregator[Any, Any, Any]])), - mapSideCombine = false - ))) - assert(!canUseUnsafeShuffle(shuffleDep( - partitioner = new HashPartitioner(2), - serializer = kryo, - keyOrdering = Some(mock(classOf[Ordering[Any]])), - aggregator = Some(mock(classOf[Aggregator[Any, Any, Any]])), - mapSideCombine = true - ))) - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleSuite.scala deleted file mode 100644 index 259020a..0000000 --- a/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleSuite.scala +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.shuffle.unsafe - -import java.io.File - -import scala.collection.JavaConverters._ - -import org.apache.commons.io.FileUtils -import org.apache.commons.io.filefilter.TrueFileFilter -import org.scalatest.BeforeAndAfterAll - -import org.apache.spark.{HashPartitioner, ShuffleDependency, SparkContext, ShuffleSuite} -import org.apache.spark.rdd.ShuffledRDD -import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} -import org.apache.spark.util.Utils - -class UnsafeShuffleSuite extends ShuffleSuite with BeforeAndAfterAll { - - // This test suite should run all tests in ShuffleSuite with unsafe-based shuffle. - - override def beforeAll() { - conf.set("spark.shuffle.manager", "tungsten-sort") - } - - test("UnsafeShuffleManager properly cleans up files for shuffles that use the new shuffle path") { - val tmpDir = Utils.createTempDir() - try { - val myConf = conf.clone() - .set("spark.local.dir", tmpDir.getAbsolutePath) - sc = new SparkContext("local", "test", myConf) - // Create a shuffled RDD and verify that it will actually use the new UnsafeShuffle path - val rdd = sc.parallelize(1 to 10, 1).map(x => (x, x)) - val shuffledRdd = new ShuffledRDD[Int, Int, Int](rdd, new HashPartitioner(4)) - .setSerializer(new KryoSerializer(myConf)) - val shuffleDep = shuffledRdd.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]] - assert(UnsafeShuffleManager.canUseUnsafeShuffle(shuffleDep)) - def getAllFiles: Set[File] = - FileUtils.listFiles(tmpDir, TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE).asScala.toSet - val filesBeforeShuffle = getAllFiles - // Force the shuffle to be performed - shuffledRdd.count() - // Ensure that the shuffle actually created files that will need to be cleaned up - val filesCreatedByShuffle = getAllFiles -- filesBeforeShuffle - filesCreatedByShuffle.map(_.getName) should be - Set("shuffle_0_0_0.data", "shuffle_0_0_0.index") - // Check that the cleanup actually removes the files - sc.env.blockManager.master.removeShuffle(0, blocking = true) - for (file <- filesCreatedByShuffle) { - assert (!file.exists(), s"Shuffle file $file was not cleaned up") - } - } finally { - Utils.deleteRecursively(tmpDir) - } - } - - test("UnsafeShuffleManager properly cleans up files for shuffles that use the old shuffle path") { - val tmpDir = Utils.createTempDir() - try { - val myConf = conf.clone() - .set("spark.local.dir", tmpDir.getAbsolutePath) - sc = new SparkContext("local", "test", myConf) - // Create a shuffled RDD and verify that it will actually use the old SortShuffle path - val rdd = sc.parallelize(1 to 10, 1).map(x => (x, x)) - val shuffledRdd = new ShuffledRDD[Int, Int, Int](rdd, new HashPartitioner(4)) - .setSerializer(new JavaSerializer(myConf)) - val shuffleDep = shuffledRdd.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]] - assert(!UnsafeShuffleManager.canUseUnsafeShuffle(shuffleDep)) - def getAllFiles: Set[File] = - FileUtils.listFiles(tmpDir, TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE).asScala.toSet - val filesBeforeShuffle = getAllFiles - // Force the shuffle to be performed - shuffledRdd.count() - // Ensure that the shuffle actually created files that will need to be cleaned up - val filesCreatedByShuffle = getAllFiles -- filesBeforeShuffle - filesCreatedByShuffle.map(_.getName) should be - Set("shuffle_0_0_0.data", "shuffle_0_0_0.index") - // Check that the cleanup actually removes the files - sc.env.blockManager.master.removeShuffle(0, blocking = true) - for (file <- filesCreatedByShuffle) { - assert (!file.exists(), s"Shuffle file $file was not cleaned up") - } - } finally { - Utils.deleteRecursively(tmpDir) - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/test/scala/org/apache/spark/util/collection/ChainedBufferSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/util/collection/ChainedBufferSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ChainedBufferSuite.scala deleted file mode 100644 index 05306f4..0000000 --- a/core/src/test/scala/org/apache/spark/util/collection/ChainedBufferSuite.scala +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.util.collection - -import java.nio.ByteBuffer - -import org.scalatest.Matchers._ - -import org.apache.spark.SparkFunSuite - -class ChainedBufferSuite extends SparkFunSuite { - test("write and read at start") { - // write from start of source array - val buffer = new ChainedBuffer(8) - buffer.capacity should be (0) - verifyWriteAndRead(buffer, 0, 0, 0, 4) - buffer.capacity should be (8) - - // write from middle of source array - verifyWriteAndRead(buffer, 0, 5, 0, 4) - buffer.capacity should be (8) - - // read to middle of target array - verifyWriteAndRead(buffer, 0, 0, 5, 4) - buffer.capacity should be (8) - - // write up to border - verifyWriteAndRead(buffer, 0, 0, 0, 8) - buffer.capacity should be (8) - - // expand into second buffer - verifyWriteAndRead(buffer, 0, 0, 0, 12) - buffer.capacity should be (16) - - // expand into multiple buffers - verifyWriteAndRead(buffer, 0, 0, 0, 28) - buffer.capacity should be (32) - } - - test("write and read at middle") { - val buffer = new ChainedBuffer(8) - - // fill to a middle point - verifyWriteAndRead(buffer, 0, 0, 0, 3) - - // write from start of source array - verifyWriteAndRead(buffer, 3, 0, 0, 4) - buffer.capacity should be (8) - - // write from middle of source array - verifyWriteAndRead(buffer, 3, 5, 0, 4) - buffer.capacity should be (8) - - // read to middle of target array - verifyWriteAndRead(buffer, 3, 0, 5, 4) - buffer.capacity should be (8) - - // write up to border - verifyWriteAndRead(buffer, 3, 0, 0, 5) - buffer.capacity should be (8) - - // expand into second buffer - verifyWriteAndRead(buffer, 3, 0, 0, 12) - buffer.capacity should be (16) - - // expand into multiple buffers - verifyWriteAndRead(buffer, 3, 0, 0, 28) - buffer.capacity should be (32) - } - - test("write and read at later buffer") { - val buffer = new ChainedBuffer(8) - - // fill to a middle point - verifyWriteAndRead(buffer, 0, 0, 0, 11) - - // write from start of source array - verifyWriteAndRead(buffer, 11, 0, 0, 4) - buffer.capacity should be (16) - - // write from middle of source array - verifyWriteAndRead(buffer, 11, 5, 0, 4) - buffer.capacity should be (16) - - // read to middle of target array - verifyWriteAndRead(buffer, 11, 0, 5, 4) - buffer.capacity should be (16) - - // write up to border - verifyWriteAndRead(buffer, 11, 0, 0, 5) - buffer.capacity should be (16) - - // expand into second buffer - verifyWriteAndRead(buffer, 11, 0, 0, 12) - buffer.capacity should be (24) - - // expand into multiple buffers - verifyWriteAndRead(buffer, 11, 0, 0, 28) - buffer.capacity should be (40) - } - - - // Used to make sure we're writing different bytes each time - var rangeStart = 0 - - /** - * @param buffer The buffer to write to and read from. - * @param offsetInBuffer The offset to write to in the buffer. - * @param offsetInSource The offset in the array that the bytes are written from. - * @param offsetInTarget The offset in the array to read the bytes into. - * @param length The number of bytes to read and write - */ - def verifyWriteAndRead( - buffer: ChainedBuffer, - offsetInBuffer: Int, - offsetInSource: Int, - offsetInTarget: Int, - length: Int): Unit = { - val source = new Array[Byte](offsetInSource + length) - (rangeStart until rangeStart + length).map(_.toByte).copyToArray(source, offsetInSource) - buffer.write(offsetInBuffer, source, offsetInSource, length) - val target = new Array[Byte](offsetInTarget + length) - buffer.read(offsetInBuffer, target, offsetInTarget, length) - ByteBuffer.wrap(source, offsetInSource, length) should be - (ByteBuffer.wrap(target, offsetInTarget, length)) - - rangeStart += 100 - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/test/scala/org/apache/spark/util/collection/PartitionedSerializedPairBufferSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/util/collection/PartitionedSerializedPairBufferSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/PartitionedSerializedPairBufferSuite.scala deleted file mode 100644 index 3b67f62..0000000 --- a/core/src/test/scala/org/apache/spark/util/collection/PartitionedSerializedPairBufferSuite.scala +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.util.collection - -import java.io.{ByteArrayInputStream, ByteArrayOutputStream} - -import com.google.common.io.ByteStreams - -import org.mockito.Matchers.any -import org.mockito.Mockito._ -import org.mockito.Mockito.RETURNS_SMART_NULLS -import org.mockito.invocation.InvocationOnMock -import org.mockito.stubbing.Answer -import org.scalatest.Matchers._ - -import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.serializer.KryoSerializer -import org.apache.spark.storage.DiskBlockObjectWriter - -class PartitionedSerializedPairBufferSuite extends SparkFunSuite { - test("OrderedInputStream single record") { - val serializerInstance = new KryoSerializer(new SparkConf()).newInstance - - val buffer = new PartitionedSerializedPairBuffer[Int, SomeStruct](4, 32, serializerInstance) - val struct = SomeStruct("something", 5) - buffer.insert(4, 10, struct) - - val bytes = ByteStreams.toByteArray(buffer.orderedInputStream) - - val baos = new ByteArrayOutputStream() - val stream = serializerInstance.serializeStream(baos) - stream.writeObject(10) - stream.writeObject(struct) - stream.close() - - baos.toByteArray should be (bytes) - } - - test("insert single record") { - val serializerInstance = new KryoSerializer(new SparkConf()).newInstance - val buffer = new PartitionedSerializedPairBuffer[Int, SomeStruct](4, 32, serializerInstance) - val struct = SomeStruct("something", 5) - buffer.insert(4, 10, struct) - val elements = buffer.partitionedDestructiveSortedIterator(None).toArray - elements.size should be (1) - elements.head should be (((4, 10), struct)) - } - - test("insert multiple records") { - val serializerInstance = new KryoSerializer(new SparkConf()).newInstance - val buffer = new PartitionedSerializedPairBuffer[Int, SomeStruct](4, 32, serializerInstance) - val struct1 = SomeStruct("something1", 8) - buffer.insert(6, 1, struct1) - val struct2 = SomeStruct("something2", 9) - buffer.insert(4, 2, struct2) - val struct3 = SomeStruct("something3", 10) - buffer.insert(5, 3, struct3) - - val elements = buffer.partitionedDestructiveSortedIterator(None).toArray - elements.size should be (3) - elements(0) should be (((4, 2), struct2)) - elements(1) should be (((5, 3), struct3)) - elements(2) should be (((6, 1), struct1)) - } - - test("write single record") { - val serializerInstance = new KryoSerializer(new SparkConf()).newInstance - val buffer = new PartitionedSerializedPairBuffer[Int, SomeStruct](4, 32, serializerInstance) - val struct = SomeStruct("something", 5) - buffer.insert(4, 10, struct) - val it = buffer.destructiveSortedWritablePartitionedIterator(None) - val (writer, baos) = createMockWriter() - assert(it.hasNext) - it.nextPartition should be (4) - it.writeNext(writer) - assert(!it.hasNext) - - val stream = serializerInstance.deserializeStream(new ByteArrayInputStream(baos.toByteArray)) - stream.readObject[AnyRef]() should be (10) - stream.readObject[AnyRef]() should be (struct) - } - - test("write multiple records") { - val serializerInstance = new KryoSerializer(new SparkConf()).newInstance - val buffer = new PartitionedSerializedPairBuffer[Int, SomeStruct](4, 32, serializerInstance) - val struct1 = SomeStruct("something1", 8) - buffer.insert(6, 1, struct1) - val struct2 = SomeStruct("something2", 9) - buffer.insert(4, 2, struct2) - val struct3 = SomeStruct("something3", 10) - buffer.insert(5, 3, struct3) - - val it = buffer.destructiveSortedWritablePartitionedIterator(None) - val (writer, baos) = createMockWriter() - assert(it.hasNext) - it.nextPartition should be (4) - it.writeNext(writer) - assert(it.hasNext) - it.nextPartition should be (5) - it.writeNext(writer) - assert(it.hasNext) - it.nextPartition should be (6) - it.writeNext(writer) - assert(!it.hasNext) - - val stream = serializerInstance.deserializeStream(new ByteArrayInputStream(baos.toByteArray)) - val iter = stream.asIterator - iter.next() should be (2) - iter.next() should be (struct2) - iter.next() should be (3) - iter.next() should be (struct3) - iter.next() should be (1) - iter.next() should be (struct1) - assert(!iter.hasNext) - } - - def createMockWriter(): (DiskBlockObjectWriter, ByteArrayOutputStream) = { - val writer = mock(classOf[DiskBlockObjectWriter], RETURNS_SMART_NULLS) - val baos = new ByteArrayOutputStream() - when(writer.write(any(), any(), any())).thenAnswer(new Answer[Unit] { - override def answer(invocationOnMock: InvocationOnMock): Unit = { - val args = invocationOnMock.getArguments - val bytes = args(0).asInstanceOf[Array[Byte]] - val offset = args(1).asInstanceOf[Int] - val length = args(2).asInstanceOf[Int] - baos.write(bytes, offset, length) - } - }) - (writer, baos) - } -} - -case class SomeStruct(str: String, num: Int) http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/docs/configuration.md ---------------------------------------------------------------------- diff --git a/docs/configuration.md b/docs/configuration.md index 46d92ce..be9c36b 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -437,12 +437,9 @@ Apart from these, the following properties are also available, and may be useful <td><code>spark.shuffle.manager</code></td> <td>sort</td> <td> - Implementation to use for shuffling data. There are three implementations available: - <code>sort</code>, <code>hash</code> and the new (1.5+) <code>tungsten-sort</code>. + Implementation to use for shuffling data. There are two implementations available: + <code>sort</code> and <code>hash</code>. Sort-based shuffle is more memory-efficient and is the default option starting in 1.2. - Tungsten-sort is similar to the sort based shuffle, with a direct binary cache-friendly - implementation with a fall back to regular sort based shuffle if its requirements are not - met. </td> </tr> <tr> http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/project/MimaExcludes.scala ---------------------------------------------------------------------- diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 0872d3f..b5e661d 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -37,6 +37,7 @@ object MimaExcludes { Seq( MimaBuild.excludeSparkPackage("deploy"), MimaBuild.excludeSparkPackage("network"), + MimaBuild.excludeSparkPackage("unsafe"), // These are needed if checking against the sbt build, since they are part of // the maven-generated artifacts in 1.3. excludePackage("org.spark-project.jetty"), @@ -44,7 +45,11 @@ object MimaExcludes { // SQL execution is considered private. excludePackage("org.apache.spark.sql.execution"), // SQL columnar is considered private. - excludePackage("org.apache.spark.sql.columnar") + excludePackage("org.apache.spark.sql.columnar"), + // The shuffle package is considered private. + excludePackage("org.apache.spark.shuffle"), + // The collections utlities are considered pricate. + excludePackage("org.apache.spark.util.collection") ) ++ MimaBuild.excludeSparkClass("streaming.flume.FlumeTestUtils") ++ MimaBuild.excludeSparkClass("streaming.flume.PollingFlumeTestUtils") ++ @@ -750,4 +755,4 @@ object MimaExcludes { MimaBuild.excludeSparkClass("mllib.regression.LinearRegressionWithSGD") case _ => Seq() } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 1d3379a..7f60c8f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -24,7 +24,6 @@ import org.apache.spark.rdd.RDD import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.hash.HashShuffleManager import org.apache.spark.shuffle.sort.SortShuffleManager -import org.apache.spark.shuffle.unsafe.UnsafeShuffleManager import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.errors.attachTree @@ -87,10 +86,8 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una // fewer partitions (like RangePartitioner, for example). val conf = child.sqlContext.sparkContext.conf val shuffleManager = SparkEnv.get.shuffleManager - val sortBasedShuffleOn = shuffleManager.isInstanceOf[SortShuffleManager] || - shuffleManager.isInstanceOf[UnsafeShuffleManager] + val sortBasedShuffleOn = shuffleManager.isInstanceOf[SortShuffleManager] val bypassMergeThreshold = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200) - val serializeMapOutputs = conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true) if (sortBasedShuffleOn) { val bypassIsSupported = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager] if (bypassIsSupported && partitioner.numPartitions <= bypassMergeThreshold) { @@ -99,22 +96,18 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una // doesn't buffer deserialized records. // Note that we'll have to remove this case if we fix SPARK-6026 and remove this bypass. false - } else if (serializeMapOutputs && serializer.supportsRelocationOfSerializedObjects) { - // SPARK-4550 extended sort-based shuffle to serialize individual records prior to sorting - // them. This optimization is guarded by a feature-flag and is only applied in cases where - // shuffle dependency does not specify an aggregator or ordering and the record serializer - // has certain properties. If this optimization is enabled, we can safely avoid the copy. + } else if (serializer.supportsRelocationOfSerializedObjects) { + // SPARK-4550 and SPARK-7081 extended sort-based shuffle to serialize individual records + // prior to sorting them. This optimization is only applied in cases where shuffle + // dependency does not specify an aggregator or ordering and the record serializer has + // certain properties. If this optimization is enabled, we can safely avoid the copy. // // Exchange never configures its ShuffledRDDs with aggregators or key orderings, so we only // need to check whether the optimization is enabled and supported by our serializer. - // - // This optimization also applies to UnsafeShuffleManager (added in SPARK-7081). false } else { - // Spark's SortShuffleManager uses `ExternalSorter` to buffer records in memory. This code - // path is used both when SortShuffleManager is used and when UnsafeShuffleManager falls - // back to SortShuffleManager to perform a shuffle that the new fast path can't handle. In - // both cases, we must copy. + // Spark's SortShuffleManager uses `ExternalSorter` to buffer records in memory, so we must + // copy. true } } else if (shuffleManager.isInstanceOf[HashShuffleManager]) { http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala index 75d1fce..1680d7e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala @@ -101,7 +101,7 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkContext { val oldEnv = SparkEnv.get // save the old SparkEnv, as it will be overwritten Utils.tryWithSafeFinally { val conf = new SparkConf() - .set("spark.shuffle.spill.initialMemoryThreshold", "1024") + .set("spark.shuffle.spill.initialMemoryThreshold", "1") .set("spark.shuffle.sort.bypassMergeThreshold", "0") .set("spark.testing.memory", "80000") @@ -109,7 +109,7 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkContext { outputFile = File.createTempFile("test-unsafe-row-serializer-spill", "") // prepare data val converter = unsafeRowConverter(Array(IntegerType)) - val data = (1 to 1000).iterator.map { i => + val data = (1 to 10000).iterator.map { i => (i, converter(Row(i))) } val sorter = new ExternalSorter[Int, UnsafeRow, UnsafeRow]( @@ -141,9 +141,8 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkContext { } } - test("SPARK-10403: unsafe row serializer with UnsafeShuffleManager") { - val conf = new SparkConf() - .set("spark.shuffle.manager", "tungsten-sort") + test("SPARK-10403: unsafe row serializer with SortShuffleManager") { + val conf = new SparkConf().set("spark.shuffle.manager", "sort") sc = new SparkContext("local", "test", conf) val row = Row("Hello", 123) val unsafeRow = toUnsafeRow(row, Array(StringType, IntegerType)) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org