This is an automated email from the ASF dual-hosted git repository. wuyi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new fc5342314c8d [SPARK-44126][CORE] Shuffle migration failure count should not increase when target executor decommissioned fc5342314c8d is described below commit fc5342314c8d0890cf97a808bcf5fdf3720a5864 Author: Warren Zhu <warren.zh...@gmail.com> AuthorDate: Tue Sep 26 10:52:40 2023 +0800 [SPARK-44126][CORE] Shuffle migration failure count should not increase when target executor decommissioned ### What changes were proposed in this pull request? Do not increase shuffle migration failure count when target executor decommissioned ### Why are the changes needed? Block manager decommissioner only sync with block manager master about live peers every `spark.storage.cachedPeersTtl`(default 60s). If some block manager decommissioned between this, it still try to migrated shuffle to such decommissioned block manger. The migration will be failed with RuntimeException("BlockSavedOnDecommissionedBlockManagerException"). Detailed stack trace as below: ``` org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301) at org.apache.spark.network.BlockTransferService.uploadBlockSync(BlockTransferService.scala:122) at org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.$anonfun$run$5(BlockManagerDecommissioner.scala:127) at org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.$anonfun$run$5$adapted(BlockManagerDecommissioner.scala:118) at scala.collection.immutable.List.foreach(List.scala:431) at org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.run(BlockManagerDecommissioner.scala:118) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.RuntimeException: org.apache.spark.storage.BlockSavedOnDecommissionedBlockManagerException: Block shuffle_2_6429_0.data cannot be saved on decommissioned executor at org.apache.spark.errors.SparkCoreErrors$.cannotSaveBlockOnDecommissionedExecutorError(SparkCoreErrors.scala:238) at org.apache.spark.storage.BlockManager.checkShouldStore(BlockManager.scala:277) at org.apache.spark.storage.BlockManager.putBlockDataAsStream(BlockManager.scala:741) at org.apache.spark.network.netty.NettyBlockRpcServer.receiveStream(NettyBlockRpcServer.scala:174) ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added UT in `BlockManagerDecommissionUnitSuite` Closes #41905 from warrenzhu25/migrate-decom. Authored-by: Warren Zhu <warren.zh...@gmail.com> Signed-off-by: Yi Wu <yi...@databricks.com> --- .../spark/storage/BlockManagerDecommissioner.scala | 16 ++++++++- .../BlockManagerDecommissionUnitSuite.scala | 40 ++++++++++++++++++++++ 2 files changed, 55 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala index 59d1f3b4c4ba..cbac3fd1a994 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala @@ -43,6 +43,8 @@ private[storage] class BlockManagerDecommissioner( private val fallbackStorage = FallbackStorage.getFallbackStorage(conf) private val maxReplicationFailuresForDecommission = conf.get(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK) + private val blockSavedOnDecommissionedBlockManagerException = + classOf[BlockSavedOnDecommissionedBlockManagerException].getSimpleName // Used for tracking if our migrations are complete. Readable for testing @volatile private[storage] var lastRDDMigrationTime: Long = 0 @@ -101,6 +103,7 @@ private[storage] class BlockManagerDecommissioner( try { val (shuffleBlockInfo, retryCount) = nextShuffleBlockToMigrate() val blocks = bm.migratableResolver.getMigrationBlocks(shuffleBlockInfo) + var isTargetDecommissioned = false // We only migrate a shuffle block when both index file and data file exist. if (blocks.isEmpty) { logInfo(s"Ignore deleted shuffle block $shuffleBlockInfo") @@ -143,6 +146,11 @@ private[storage] class BlockManagerDecommissioner( // have been used in the try-block above so there's no point trying again && peer != FallbackStorage.FALLBACK_BLOCK_MANAGER_ID) { fallbackStorage.foreach(_.copy(shuffleBlockInfo, bm)) + } else if (e.getCause != null && e.getCause.getMessage != null + && e.getCause.getMessage + .contains(blockSavedOnDecommissionedBlockManagerException)) { + isTargetDecommissioned = true + keepRunning = false } else { logError(s"Error occurred during migrating $shuffleBlockInfo", e) keepRunning = false @@ -156,8 +164,14 @@ private[storage] class BlockManagerDecommissioner( numMigratedShuffles.incrementAndGet() } else { logWarning(s"Stop migrating shuffle blocks to $peer") + + val newRetryCount = if (isTargetDecommissioned) { + retryCount + } else { + retryCount + 1 + } // Do not mark the block as migrated if it still needs retry - if (!allowRetry(shuffleBlockInfo, retryCount + 1)) { + if (!allowRetry(shuffleBlockInfo, newRetryCount)) { numMigratedShuffles.incrementAndGet() } } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala index 67a4514d5bda..b7ad6722faa8 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala @@ -231,6 +231,46 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite with Matchers { numShuffles = Option(1)) } + test("SPARK-44126: block decom manager handles BlockSavedOnDecommissionedBlockManagerException") { + // Set up the mocks so we return one shuffle block + val conf = sparkConf + .clone + .set(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK, 1) + val bm = mock(classOf[BlockManager]) + val migratableShuffleBlockResolver = mock(classOf[MigratableResolver]) + registerShuffleBlocks(migratableShuffleBlockResolver, Set((1, 1L, 1))) + when(bm.migratableResolver).thenReturn(migratableShuffleBlockResolver) + when(bm.getMigratableRDDBlocks()) + .thenReturn(Seq()) + val exe1 = BlockManagerId("exec1", "host1", 12345) + val exe2 = BlockManagerId("exec2", "host2", 12345) + when(bm.getPeers(mc.any())) + .thenReturn(Seq(exe1), Seq(exe1), Seq(exe2)) + + val blockTransferService = mock(classOf[BlockTransferService]) + // Simulate BlockSavedOnDecommissionedBlockManagerException + when(blockTransferService.uploadBlock( + mc.any(), mc.any(), mc.eq(exe1.executorId), mc.any(), mc.any(), mc.any(), mc.isNull())) + .thenReturn( + Future.failed(new RuntimeException("BlockSavedOnDecommissionedBlockManagerException")) + ) + when(blockTransferService.uploadBlockSync( + mc.any(), mc.any(), mc.any(), mc.any(), mc.any(), mc.any(), mc.isNull())) + .thenCallRealMethod() + + when(bm.blockTransferService).thenReturn(blockTransferService) + + // Verify the decom manager handles this correctly + val bmDecomManager = new BlockManagerDecommissioner(conf, bm) + validateDecommissionTimestampsOnManager(bmDecomManager) + verify(blockTransferService, times(1)) + .uploadBlock(mc.any(), mc.any(), mc.eq(exe1.executorId), + mc.any(), mc.any(), mc.any(), mc.isNull()) + verify(blockTransferService, times(1)) + .uploadBlock(mc.any(), mc.any(), mc.eq(exe2.executorId), + mc.any(), mc.any(), mc.any(), mc.isNull()) + } + test("block decom manager handles IO failures") { // Set up the mocks so we return one shuffle block val bm = mock(classOf[BlockManager]) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org