This is an automated email from the ASF dual-hosted git repository.

wuyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new fc5342314c8d [SPARK-44126][CORE] Shuffle migration failure count 
should not increase when target executor decommissioned
fc5342314c8d is described below

commit fc5342314c8d0890cf97a808bcf5fdf3720a5864
Author: Warren Zhu <warren.zh...@gmail.com>
AuthorDate: Tue Sep 26 10:52:40 2023 +0800

    [SPARK-44126][CORE] Shuffle migration failure count should not increase 
when target executor decommissioned
    
    ### What changes were proposed in this pull request?
    Do not increase shuffle migration failure count when target executor 
decommissioned
    
    ### Why are the changes needed?
    Block manager decommissioner only sync with block manager master about live 
peers every `spark.storage.cachedPeersTtl`(default 60s). If some block manager 
decommissioned between this, it still try to migrated shuffle to such 
decommissioned block manger. The migration will be failed with 
RuntimeException("BlockSavedOnDecommissionedBlockManagerException"). Detailed 
stack trace as below:
    
    ```
    org.apache.spark.SparkException: Exception thrown in awaitResult:
        at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
        at 
org.apache.spark.network.BlockTransferService.uploadBlockSync(BlockTransferService.scala:122)
        at 
org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.$anonfun$run$5(BlockManagerDecommissioner.scala:127)
        at 
org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.$anonfun$run$5$adapted(BlockManagerDecommissioner.scala:118)
        at scala.collection.immutable.List.foreach(List.scala:431)
        at 
org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.run(BlockManagerDecommissioner.scala:118)
    at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
    Caused by: java.lang.RuntimeException: 
org.apache.spark.storage.BlockSavedOnDecommissionedBlockManagerException: Block 
shuffle_2_6429_0.data cannot be saved on decommissioned executor
        at 
org.apache.spark.errors.SparkCoreErrors$.cannotSaveBlockOnDecommissionedExecutorError(SparkCoreErrors.scala:238)
    at 
org.apache.spark.storage.BlockManager.checkShouldStore(BlockManager.scala:277)
        at 
org.apache.spark.storage.BlockManager.putBlockDataAsStream(BlockManager.scala:741)
        at 
org.apache.spark.network.netty.NettyBlockRpcServer.receiveStream(NettyBlockRpcServer.scala:174)
    
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added UT in `BlockManagerDecommissionUnitSuite`
    
    Closes #41905 from warrenzhu25/migrate-decom.
    
    Authored-by: Warren Zhu <warren.zh...@gmail.com>
    Signed-off-by: Yi Wu <yi...@databricks.com>
---
 .../spark/storage/BlockManagerDecommissioner.scala | 16 ++++++++-
 .../BlockManagerDecommissionUnitSuite.scala        | 40 ++++++++++++++++++++++
 2 files changed, 55 insertions(+), 1 deletion(-)

diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
index 59d1f3b4c4ba..cbac3fd1a994 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
@@ -43,6 +43,8 @@ private[storage] class BlockManagerDecommissioner(
   private val fallbackStorage = FallbackStorage.getFallbackStorage(conf)
   private val maxReplicationFailuresForDecommission =
     conf.get(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK)
+  private val blockSavedOnDecommissionedBlockManagerException =
+    classOf[BlockSavedOnDecommissionedBlockManagerException].getSimpleName
 
   // Used for tracking if our migrations are complete. Readable for testing
   @volatile private[storage] var lastRDDMigrationTime: Long = 0
@@ -101,6 +103,7 @@ private[storage] class BlockManagerDecommissioner(
         try {
           val (shuffleBlockInfo, retryCount) = nextShuffleBlockToMigrate()
           val blocks = 
bm.migratableResolver.getMigrationBlocks(shuffleBlockInfo)
+          var isTargetDecommissioned = false
           // We only migrate a shuffle block when both index file and data 
file exist.
           if (blocks.isEmpty) {
             logInfo(s"Ignore deleted shuffle block $shuffleBlockInfo")
@@ -143,6 +146,11 @@ private[storage] class BlockManagerDecommissioner(
                     // have been used in the try-block above so there's no 
point trying again
                     && peer != FallbackStorage.FALLBACK_BLOCK_MANAGER_ID) {
                   fallbackStorage.foreach(_.copy(shuffleBlockInfo, bm))
+                } else if (e.getCause != null && e.getCause.getMessage != null
+                  && e.getCause.getMessage
+                  .contains(blockSavedOnDecommissionedBlockManagerException)) {
+                  isTargetDecommissioned = true
+                  keepRunning = false
                 } else {
                   logError(s"Error occurred during migrating 
$shuffleBlockInfo", e)
                   keepRunning = false
@@ -156,8 +164,14 @@ private[storage] class BlockManagerDecommissioner(
             numMigratedShuffles.incrementAndGet()
           } else {
             logWarning(s"Stop migrating shuffle blocks to $peer")
+
+            val newRetryCount = if (isTargetDecommissioned) {
+              retryCount
+            } else {
+              retryCount + 1
+            }
             // Do not mark the block as migrated if it still needs retry
-            if (!allowRetry(shuffleBlockInfo, retryCount + 1)) {
+            if (!allowRetry(shuffleBlockInfo, newRetryCount)) {
               numMigratedShuffles.incrementAndGet()
             }
           }
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
index 67a4514d5bda..b7ad6722faa8 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
@@ -231,6 +231,46 @@ class BlockManagerDecommissionUnitSuite extends 
SparkFunSuite with Matchers {
       numShuffles = Option(1))
   }
 
+  test("SPARK-44126: block decom manager handles 
BlockSavedOnDecommissionedBlockManagerException") {
+    // Set up the mocks so we return one shuffle block
+    val conf = sparkConf
+      .clone
+      .set(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK, 1)
+    val bm = mock(classOf[BlockManager])
+    val migratableShuffleBlockResolver = mock(classOf[MigratableResolver])
+    registerShuffleBlocks(migratableShuffleBlockResolver, Set((1, 1L, 1)))
+    when(bm.migratableResolver).thenReturn(migratableShuffleBlockResolver)
+    when(bm.getMigratableRDDBlocks())
+      .thenReturn(Seq())
+    val exe1 = BlockManagerId("exec1", "host1", 12345)
+    val exe2 = BlockManagerId("exec2", "host2", 12345)
+    when(bm.getPeers(mc.any()))
+      .thenReturn(Seq(exe1), Seq(exe1), Seq(exe2))
+
+    val blockTransferService = mock(classOf[BlockTransferService])
+    // Simulate BlockSavedOnDecommissionedBlockManagerException
+    when(blockTransferService.uploadBlock(
+      mc.any(), mc.any(), mc.eq(exe1.executorId), mc.any(), mc.any(), 
mc.any(), mc.isNull()))
+      .thenReturn(
+        Future.failed(new 
RuntimeException("BlockSavedOnDecommissionedBlockManagerException"))
+      )
+    when(blockTransferService.uploadBlockSync(
+      mc.any(), mc.any(), mc.any(), mc.any(), mc.any(), mc.any(), mc.isNull()))
+      .thenCallRealMethod()
+
+    when(bm.blockTransferService).thenReturn(blockTransferService)
+
+    // Verify the decom manager handles this correctly
+    val bmDecomManager = new BlockManagerDecommissioner(conf, bm)
+    validateDecommissionTimestampsOnManager(bmDecomManager)
+    verify(blockTransferService, times(1))
+      .uploadBlock(mc.any(), mc.any(), mc.eq(exe1.executorId),
+        mc.any(), mc.any(), mc.any(), mc.isNull())
+    verify(blockTransferService, times(1))
+      .uploadBlock(mc.any(), mc.any(), mc.eq(exe2.executorId),
+        mc.any(), mc.any(), mc.any(), mc.isNull())
+  }
+
   test("block decom manager handles IO failures") {
     // Set up the mocks so we return one shuffle block
     val bm = mock(classOf[BlockManager])


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to