This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new 5e417ff8b14 [SPARK-44547][CORE] Ignore fallback storage for cached RDD migration 5e417ff8b14 is described below commit 5e417ff8b14764d28ccc9561cb268de83b15eff2 Author: Frank Yin <fra...@ziprecruiter.com> AuthorDate: Thu Aug 24 22:19:41 2023 -0700 [SPARK-44547][CORE] Ignore fallback storage for cached RDD migration ### What changes were proposed in this pull request? Fix bugs that makes the RDD decommissioner never finish ### Why are the changes needed? The cached RDD decommissioner is in a forever retry loop when the only viable peer is the fallback storage, which it doesn't know how to handle. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Tests are added and tested using Spark jobs. Closes #42155 from ukby1234/franky.SPARK-44547. Authored-by: Frank Yin <fra...@ziprecruiter.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> (cherry picked from commit 47555da2ae292b07488ba181db1aceac8e7ddb3a) Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../spark/storage/BlockManagerDecommissioner.scala | 4 +-- .../BlockManagerDecommissionUnitSuite.scala | 35 +++++++++++++++++++++- 2 files changed, 36 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala index ecd64b6695a..32df6388cab 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala @@ -184,7 +184,7 @@ private[storage] class BlockManagerDecommissioner( // Set if we encounter an error attempting to migrate and stop. @volatile private var stopped = false - @volatile private var stoppedRDD = + @volatile private[storage] var stoppedRDD = !conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED) @volatile private var stoppedShuffle = !conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED) @@ -204,7 +204,7 @@ private[storage] class BlockManagerDecommissioner( logInfo("Attempting to migrate all RDD blocks") while (!stopped && !stoppedRDD) { // Validate if we have peers to migrate to. Otherwise, give up migration. - if (bm.getPeers(false).isEmpty) { + if (!bm.getPeers(false).exists(_ != FallbackStorage.FALLBACK_BLOCK_MANAGER_ID)) { logWarning("No available peers to receive RDD blocks, stop migration.") stoppedRDD = true } else { diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala index b7ac378b4c6..f9263f41060 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.storage import scala.concurrent.duration._ import org.mockito.{ArgumentMatchers => mc} -import org.mockito.Mockito.{atLeast => least, mock, times, verify, when} +import org.mockito.Mockito.{atLeast => least, mock, never, times, verify, when} import org.scalatest.concurrent.Eventually._ import org.scalatest.matchers.must.Matchers @@ -305,4 +305,37 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite with Matchers { bmDecomManager.stop() } } + + test("SPARK-44547: test cached rdd migration no available hosts") { + val blockTransferService = mock(classOf[BlockTransferService]) + val bm = mock(classOf[BlockManager]) + + val storedBlockId1 = RDDBlockId(0, 0) + val storedBlock1 = + new ReplicateBlock(storedBlockId1, Seq(BlockManagerId("replicaHolder", "host1", bmPort)), 1) + + val migratableShuffleBlockResolver = mock(classOf[MigratableResolver]) + registerShuffleBlocks(migratableShuffleBlockResolver, Set()) + when(bm.getPeers(mc.any())) + .thenReturn(Seq(FallbackStorage.FALLBACK_BLOCK_MANAGER_ID)) + + when(bm.blockTransferService).thenReturn(blockTransferService) + when(bm.migratableResolver).thenReturn(migratableShuffleBlockResolver) + when(bm.getMigratableRDDBlocks()) + .thenReturn(Seq(storedBlock1)) + + val bmDecomManager = new BlockManagerDecommissioner(sparkConf, bm) + + try { + bmDecomManager.start() + eventually(timeout(100.second), interval(10.milliseconds)) { + verify(bm, never()).replicateBlock( + mc.eq(storedBlockId1), mc.any(), mc.any(), mc.eq(Some(3))) + assert(bmDecomManager.rddBlocksLeft) + assert(bmDecomManager.stoppedRDD) + } + } finally { + bmDecomManager.stop() + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org