This is an automated email from the ASF dual-hosted git repository. holden pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 2b51843 [SPARK-34363][CORE] Add an option for limiting storage for migrated shuffle blocks 2b51843 is described below commit 2b51843ca41236f8cec29c406ea35ce1088364cf Author: Holden Karau <hka...@apple.com> AuthorDate: Tue Feb 9 10:21:56 2021 -0800 [SPARK-34363][CORE] Add an option for limiting storage for migrated shuffle blocks ### What changes were proposed in this pull request? Allow users to configure a maximum amount of shuffle blocks to be stored and reject remote shuffle blocks when this threshold is exceeded. ### Why are the changes needed? In disk constrained environments with large amount of shuffle data, migrations may result in excessive disk pressure on the nodes. On Kube nodes this can result in cascading failures when combined with `emptyDir`. ### Does this PR introduce _any_ user-facing change? Yes, new configuration parameter. ### How was this patch tested? New unit tests. Closes #31493 from holdenk/SPARK-34337-reject-disk-blocks-when-under-disk-pressure. Lead-authored-by: Holden Karau <hka...@apple.com> Co-authored-by: Holden Karau <hol...@pigscanfly.ca> Signed-off-by: Holden Karau <hka...@apple.com> --- .../org/apache/spark/internal/config/package.scala | 11 ++++++ .../spark/shuffle/IndexShuffleBlockResolver.scala | 20 +++++++++- .../apache/spark/shuffle/MigratableResolver.scala | 1 + .../apache/spark/storage/BlockManagerSuite.scala | 46 ++++++++++++++++------ 4 files changed, 65 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 1afad30..7aeb51d 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -488,6 +488,17 @@ package object config { .booleanConf .createWithDefault(false) + private[spark] val STORAGE_DECOMMISSION_SHUFFLE_MAX_DISK_SIZE = + ConfigBuilder("spark.storage.decommission.shuffleBlocks.maxDiskSize") + .doc("Maximum disk space to use to store shuffle blocks before rejecting remote " + + "shuffle blocks. Rejecting remote shuffle blocks means that an executor will not receive " + + "any shuffle migrations, and if there are no other executors available for migration " + + "then shuffle blocks will be lost unless " + + s"${STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH.key} is configured.") + .version("3.2.0") + .bytesConf(ByteUnit.BYTE) + .createOptional + private[spark] val STORAGE_REPLICATION_TOPOLOGY_FILE = ConfigBuilder("spark.storage.replication.topologyFile") .version("2.1.0") diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index 5f0bb42..d30b73a 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -22,8 +22,8 @@ import java.nio.ByteBuffer import java.nio.channels.Channels import java.nio.file.Files -import org.apache.spark.{SparkConf, SparkEnv} -import org.apache.spark.internal.Logging +import org.apache.spark.{SparkConf, SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} import org.apache.spark.io.NioBufferedFileInputStream import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} import org.apache.spark.network.client.StreamCallbackWithID @@ -56,6 +56,8 @@ private[spark] class IndexShuffleBlockResolver( private val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle") + private val remoteShuffleMaxDisk: Option[Long] = + conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_MAX_DISK_SIZE) def getDataFile(shuffleId: Int, mapId: Long): File = getDataFile(shuffleId, mapId, None) @@ -72,6 +74,13 @@ private[spark] class IndexShuffleBlockResolver( } } + private def getShuffleBytesStored(): Long = { + val shuffleFiles: Seq[File] = getStoredShuffles().map { + si => getDataFile(si.shuffleId, si.mapId) + } + shuffleFiles.map(_.length()).sum + } + /** * Get the shuffle data file. * @@ -173,6 +182,13 @@ private[spark] class IndexShuffleBlockResolver( */ override def putShuffleBlockAsStream(blockId: BlockId, serializerManager: SerializerManager): StreamCallbackWithID = { + // Throw an exception if we have exceeded maximum shuffle files stored + remoteShuffleMaxDisk.foreach { maxBytes => + val bytesUsed = getShuffleBytesStored() + if (maxBytes < bytesUsed) { + throw new SparkException(s"Not storing remote shuffles $bytesUsed exceeds $maxBytes") + } + } val file = blockId match { case ShuffleIndexBlockId(shuffleId, mapId, _) => getIndexFile(shuffleId, mapId) diff --git a/core/src/main/scala/org/apache/spark/shuffle/MigratableResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/MigratableResolver.scala index 3851fa6..9908281 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/MigratableResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/MigratableResolver.scala @@ -37,6 +37,7 @@ trait MigratableResolver { /** * Write a provided shuffle block as a stream. Used for block migrations. + * Up to the implementation to support STORAGE_REMOTE_SHUFFLE_MAX_DISK */ def putShuffleBlockAsStream(blockId: BlockId, serializerManager: SerializerManager): StreamCallbackWithID diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 09678c7..82d7abf 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -103,8 +103,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE .set(PUSH_BASED_SHUFFLE_ENABLED, true) } - private def makeSortShuffleManager(): SortShuffleManager = { - val newMgr = new SortShuffleManager(new SparkConf(false)) + private def makeSortShuffleManager(conf: Option[SparkConf] = None): SortShuffleManager = { + val newMgr = new SortShuffleManager(conf.getOrElse(new SparkConf(false))) sortShuffleManagers += newMgr newMgr } @@ -1932,22 +1932,29 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(master.getLocations(blockIdLarge) === Seq(store1.blockManagerId)) } - test("test migration of shuffle blocks during decommissioning") { - val shuffleManager1 = makeSortShuffleManager() + private def testShuffleBlockDecommissioning(maxShuffleSize: Option[Int], willReject: Boolean) = { + maxShuffleSize.foreach{ size => + conf.set(STORAGE_DECOMMISSION_SHUFFLE_MAX_DISK_SIZE.key, s"${size}b") + } + val shuffleManager1 = makeSortShuffleManager(Some(conf)) val bm1 = makeBlockManager(3500, "exec1", shuffleManager = shuffleManager1) shuffleManager1.shuffleBlockResolver._blockManager = bm1 - val shuffleManager2 = makeSortShuffleManager() + val shuffleManager2 = makeSortShuffleManager(Some(conf)) val bm2 = makeBlockManager(3500, "exec2", shuffleManager = shuffleManager2) shuffleManager2.shuffleBlockResolver._blockManager = bm2 val blockSize = 5 val shuffleDataBlockContent = Array[Byte](0, 1, 2, 3, 4) val shuffleData = ShuffleDataBlockId(0, 0, 0) + val shuffleData2 = ShuffleDataBlockId(1, 0, 0) Files.write(bm1.diskBlockManager.getFile(shuffleData).toPath(), shuffleDataBlockContent) + Files.write(bm2.diskBlockManager.getFile(shuffleData2).toPath(), shuffleDataBlockContent) val shuffleIndexBlockContent = Array[Byte](5, 6, 7, 8, 9) val shuffleIndex = ShuffleIndexBlockId(0, 0, 0) + val shuffleIndex2 = ShuffleIndexBlockId(1, 0, 0) Files.write(bm1.diskBlockManager.getFile(shuffleIndex).toPath(), shuffleIndexBlockContent) + Files.write(bm2.diskBlockManager.getFile(shuffleIndex2).toPath(), shuffleIndexBlockContent) mapOutputTracker.registerShuffle(0, 1) val decomManager = new BlockManagerDecommissioner(conf, bm1) @@ -1961,13 +1968,18 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE decomManager.refreshOffloadingShuffleBlocks() - eventually(timeout(1.second), interval(10.milliseconds)) { - assert(mapOutputTracker.shuffleStatuses(0).mapStatuses(0).location === bm2.blockManagerId) + if (willReject) { + eventually(timeout(1.second), interval(10.milliseconds)) { + assert(mapOutputTracker.shuffleStatuses(0).mapStatuses(0).location === bm2.blockManagerId) + } + assert(Files.readAllBytes(bm2.diskBlockManager.getFile(shuffleData).toPath()) + === shuffleDataBlockContent) + assert(Files.readAllBytes(bm2.diskBlockManager.getFile(shuffleIndex).toPath()) + === shuffleIndexBlockContent) + } else { + Thread.sleep(1000) + assert(mapOutputTracker.shuffleStatuses(0).mapStatuses(0).location === bm1.blockManagerId) } - assert(Files.readAllBytes(bm2.diskBlockManager.getFile(shuffleData).toPath()) - === shuffleDataBlockContent) - assert(Files.readAllBytes(bm2.diskBlockManager.getFile(shuffleIndex).toPath()) - === shuffleIndexBlockContent) } finally { mapOutputTracker.unregisterShuffle(0) // Avoid thread leak @@ -1975,6 +1987,18 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } } + test("test migration of shuffle blocks during decommissioning - no limit") { + testShuffleBlockDecommissioning(None, true) + } + + test("test migration of shuffle blocks during decommissioning - larger limit") { + testShuffleBlockDecommissioning(Some(10000), true) + } + + test("[SPARK-34363]test migration of shuffle blocks during decommissioning - small limit") { + testShuffleBlockDecommissioning(Some(1), false) + } + test("SPARK-32919: Shuffle push merger locations should be bounded with in" + " spark.shuffle.push.retainedMergerLocations") { assert(master.getShufflePushMergerLocations(10, Set.empty).isEmpty) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org