This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 5e417ff8b14 [SPARK-44547][CORE] Ignore fallback storage for cached RDD 
migration
5e417ff8b14 is described below

commit 5e417ff8b14764d28ccc9561cb268de83b15eff2
Author: Frank Yin <fra...@ziprecruiter.com>
AuthorDate: Thu Aug 24 22:19:41 2023 -0700

    [SPARK-44547][CORE] Ignore fallback storage for cached RDD migration
    
    ### What changes were proposed in this pull request?
    
    Fix bugs that makes the RDD decommissioner never finish
    
    ### Why are the changes needed?
    
    The cached RDD decommissioner is in a forever retry loop when the only 
viable peer is the fallback storage, which it doesn't know how to handle.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    ### How was this patch tested?
    
    Tests are added and tested using Spark jobs.
    
    Closes #42155 from ukby1234/franky.SPARK-44547.
    
    Authored-by: Frank Yin <fra...@ziprecruiter.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
    (cherry picked from commit 47555da2ae292b07488ba181db1aceac8e7ddb3a)
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../spark/storage/BlockManagerDecommissioner.scala |  4 +--
 .../BlockManagerDecommissionUnitSuite.scala        | 35 +++++++++++++++++++++-
 2 files changed, 36 insertions(+), 3 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
index ecd64b6695a..32df6388cab 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
@@ -184,7 +184,7 @@ private[storage] class BlockManagerDecommissioner(
 
   // Set if we encounter an error attempting to migrate and stop.
   @volatile private var stopped = false
-  @volatile private var stoppedRDD =
+  @volatile private[storage] var stoppedRDD =
     !conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)
   @volatile private var stoppedShuffle =
     !conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)
@@ -204,7 +204,7 @@ private[storage] class BlockManagerDecommissioner(
       logInfo("Attempting to migrate all RDD blocks")
       while (!stopped && !stoppedRDD) {
         // Validate if we have peers to migrate to. Otherwise, give up 
migration.
-        if (bm.getPeers(false).isEmpty) {
+        if (!bm.getPeers(false).exists(_ != 
FallbackStorage.FALLBACK_BLOCK_MANAGER_ID)) {
           logWarning("No available peers to receive RDD blocks, stop 
migration.")
           stoppedRDD = true
         } else {
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
index b7ac378b4c6..f9263f41060 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.storage
 import scala.concurrent.duration._
 
 import org.mockito.{ArgumentMatchers => mc}
-import org.mockito.Mockito.{atLeast => least, mock, times, verify, when}
+import org.mockito.Mockito.{atLeast => least, mock, never, times, verify, when}
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.matchers.must.Matchers
 
@@ -305,4 +305,37 @@ class BlockManagerDecommissionUnitSuite extends 
SparkFunSuite with Matchers {
         bmDecomManager.stop()
     }
   }
+
+  test("SPARK-44547: test cached rdd migration no available hosts") {
+    val blockTransferService = mock(classOf[BlockTransferService])
+    val bm = mock(classOf[BlockManager])
+
+    val storedBlockId1 = RDDBlockId(0, 0)
+    val storedBlock1 =
+      new ReplicateBlock(storedBlockId1, Seq(BlockManagerId("replicaHolder", 
"host1", bmPort)), 1)
+
+    val migratableShuffleBlockResolver = mock(classOf[MigratableResolver])
+    registerShuffleBlocks(migratableShuffleBlockResolver, Set())
+    when(bm.getPeers(mc.any()))
+      .thenReturn(Seq(FallbackStorage.FALLBACK_BLOCK_MANAGER_ID))
+
+    when(bm.blockTransferService).thenReturn(blockTransferService)
+    when(bm.migratableResolver).thenReturn(migratableShuffleBlockResolver)
+    when(bm.getMigratableRDDBlocks())
+      .thenReturn(Seq(storedBlock1))
+
+    val bmDecomManager = new BlockManagerDecommissioner(sparkConf, bm)
+
+    try {
+      bmDecomManager.start()
+      eventually(timeout(100.second), interval(10.milliseconds)) {
+        verify(bm, never()).replicateBlock(
+          mc.eq(storedBlockId1), mc.any(), mc.any(), mc.eq(Some(3)))
+        assert(bmDecomManager.rddBlocksLeft)
+        assert(bmDecomManager.stoppedRDD)
+      }
+    } finally {
+      bmDecomManager.stop()
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to