This is an automated email from the ASF dual-hosted git repository. holden pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 9faad07 HOTFIX Revert "[SPARK-20732][CORE] Decommission cache blocks 9faad07 is described below commit 9faad07ce706890008a8a3ce675fa95b0bdf7c14 Author: Holden Karau <hka...@apple.com> AuthorDate: Fri Apr 24 18:51:25 2020 -0700 HOTFIX Revert "[SPARK-20732][CORE] Decommission cache blocks HOTFIX test issue introduced in SPARK-20732 Closes #28337 from holdenk/revert-SPARK-20732. Authored-by: Holden Karau <hka...@apple.com> Signed-off-by: Holden Karau <hka...@apple.com> --- .../org/apache/spark/internal/config/package.scala | 28 ----- .../cluster/CoarseGrainedSchedulerBackend.scala | 17 +-- .../org/apache/spark/storage/BlockManager.scala | 129 ++------------------- .../apache/spark/storage/BlockManagerMaster.scala | 10 -- .../spark/storage/BlockManagerMasterEndpoint.scala | 50 +------- .../spark/storage/BlockManagerMessages.scala | 7 -- .../spark/storage/BlockManagerSlaveEndpoint.scala | 3 - .../storage/BlockManagerDecommissionSuite.scala | 104 ----------------- .../apache/spark/storage/BlockManagerSuite.scala | 58 --------- 9 files changed, 12 insertions(+), 394 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 1bc2734..5006da0 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -413,34 +413,6 @@ package object config { .intConf .createWithDefault(1) - private[spark] val STORAGE_DECOMMISSION_ENABLED = - ConfigBuilder("spark.storage.decommission.enabled") - .doc("Whether to decommission the block manager when decommissioning executor") - .version("3.1.0") - .booleanConf - .createWithDefault(false) - - private[spark] val STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK = - ConfigBuilder("spark.storage.decommission.maxReplicationFailuresPerBlock") - .internal() - .doc("Maximum number of failures which can be handled for the replication of " + - "one RDD block when block manager is decommissioning and trying to move its " + - "existing blocks.") - .version("3.1.0") - .intConf - .createWithDefault(3) - - private[spark] val STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL = - ConfigBuilder("spark.storage.decommission.replicationReattemptInterval") - .internal() - .doc("The interval of time between consecutive cache block replication reattempts " + - "happening on each decommissioning executor (due to storage decommissioning).") - .version("3.1.0") - .timeConf(TimeUnit.MILLISECONDS) - .checkValue(_ > 0, "Time interval between two consecutive attempts of " + - "cache block replication should be positive.") - .createWithDefaultString("30s") - private[spark] val STORAGE_REPLICATION_TOPOLOGY_FILE = ConfigBuilder("spark.storage.replication.topologyFile") .version("2.1.0") diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 67638a5..701d69b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -438,19 +438,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp logError(s"Unexpected error during decommissioning ${e.toString}", e) } logInfo(s"Finished decommissioning executor $executorId.") - - if (conf.get(STORAGE_DECOMMISSION_ENABLED)) { - try { - logInfo("Starting decommissioning block manager corresponding to " + - s"executor $executorId.") - scheduler.sc.env.blockManager.master.decommissionBlockManagers(Seq(executorId)) - } catch { - case e: Exception => - logError("Unexpected error during block manager " + - s"decommissioning for executor $executorId: ${e.toString}", e) - } - logInfo(s"Acknowledged decommissioning block manager corresponding to $executorId.") - } } else { logInfo(s"Skipping decommissioning of executor $executorId.") } @@ -587,7 +574,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp */ private[spark] def decommissionExecutor(executorId: String): Unit = { if (driverEndpoint != null) { - logInfo("Propagating executor decommission to driver.") + logInfo("Propegating executor decommission to driver.") driverEndpoint.send(DecommissionExecutor(executorId)) } } @@ -671,7 +658,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp /** * Update the cluster manager on our scheduling needs. Three bits of information are included * to help it make decisions. - * @param resourceProfileIdToNumExecutors The total number of executors we'd like to have per + * @param resourceProfileToNumExecutors The total number of executors we'd like to have per * ResourceProfile. The cluster manager shouldn't kill any * running executor to reach this number, but, if all * existing executors were to die, this is the number diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index aa15d12..e7f8de5 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -54,7 +54,6 @@ import org.apache.spark.rpc.RpcEnv import org.apache.spark.scheduler.ExecutorCacheTaskLocation import org.apache.spark.serializer.{SerializerInstance, SerializerManager} import org.apache.spark.shuffle.{ShuffleManager, ShuffleWriteMetricsReporter} -import org.apache.spark.storage.BlockManagerMessages.ReplicateBlock import org.apache.spark.storage.memory._ import org.apache.spark.unsafe.Platform import org.apache.spark.util._ @@ -242,9 +241,6 @@ private[spark] class BlockManager( private var blockReplicationPolicy: BlockReplicationPolicy = _ - private var blockManagerDecommissioning: Boolean = false - private var decommissionManager: Option[BlockManagerDecommissionManager] = None - // A DownloadFileManager used to track all the files of remote blocks which are above the // specified memory threshold. Files will be deleted automatically based on weak reference. // Exposed for test @@ -1555,22 +1551,18 @@ private[spark] class BlockManager( } /** - * Replicates a block to peer block managers based on existingReplicas and maxReplicas + * Called for pro-active replenishment of blocks lost due to executor failures * * @param blockId blockId being replicate * @param existingReplicas existing block managers that have a replica * @param maxReplicas maximum replicas needed - * @param maxReplicationFailures number of replication failures to tolerate before - * giving up. - * @return whether block was successfully replicated or not */ def replicateBlock( blockId: BlockId, existingReplicas: Set[BlockManagerId], - maxReplicas: Int, - maxReplicationFailures: Option[Int] = None): Boolean = { + maxReplicas: Int): Unit = { logInfo(s"Using $blockManagerId to pro-actively replicate $blockId") - blockInfoManager.lockForReading(blockId).forall { info => + blockInfoManager.lockForReading(blockId).foreach { info => val data = doGetLocalBytes(blockId, info) val storageLevel = StorageLevel( useDisk = info.level.useDisk, @@ -1578,13 +1570,11 @@ private[spark] class BlockManager( useOffHeap = info.level.useOffHeap, deserialized = info.level.deserialized, replication = maxReplicas) - // we know we are called as a result of an executor removal or because the current executor - // is getting decommissioned. so we refresh peer cache before trying replication, we won't - // try to replicate to a missing executor/another decommissioning executor + // we know we are called as a result of an executor removal, so we refresh peer cache + // this way, we won't try to replicate to a missing executor with a stale reference getPeers(forceFetch = true) try { - replicate( - blockId, data, storageLevel, info.classTag, existingReplicas, maxReplicationFailures) + replicate(blockId, data, storageLevel, info.classTag, existingReplicas) } finally { logDebug(s"Releasing lock for $blockId") releaseLockAndDispose(blockId, data) @@ -1601,11 +1591,9 @@ private[spark] class BlockManager( data: BlockData, level: StorageLevel, classTag: ClassTag[_], - existingReplicas: Set[BlockManagerId] = Set.empty, - maxReplicationFailures: Option[Int] = None): Boolean = { + existingReplicas: Set[BlockManagerId] = Set.empty): Unit = { - val maxReplicationFailureCount = maxReplicationFailures.getOrElse( - conf.get(config.STORAGE_MAX_REPLICATION_FAILURE)) + val maxReplicationFailures = conf.get(config.STORAGE_MAX_REPLICATION_FAILURE) val tLevel = StorageLevel( useDisk = level.useDisk, useMemory = level.useMemory, @@ -1629,7 +1617,7 @@ private[spark] class BlockManager( blockId, numPeersToReplicateTo) - while(numFailures <= maxReplicationFailureCount && + while(numFailures <= maxReplicationFailures && !peersForReplication.isEmpty && peersReplicatedTo.size < numPeersToReplicateTo) { val peer = peersForReplication.head @@ -1677,11 +1665,9 @@ private[spark] class BlockManager( if (peersReplicatedTo.size < numPeersToReplicateTo) { logWarning(s"Block $blockId replicated to only " + s"${peersReplicatedTo.size} peer(s) instead of $numPeersToReplicateTo peers") - return false } logDebug(s"block $blockId replicated to ${peersReplicatedTo.mkString(", ")}") - return true } /** @@ -1775,58 +1761,6 @@ private[spark] class BlockManager( blocksToRemove.size } - def decommissionBlockManager(): Unit = { - if (!blockManagerDecommissioning) { - logInfo("Starting block manager decommissioning process") - blockManagerDecommissioning = true - decommissionManager = Some(new BlockManagerDecommissionManager(conf)) - decommissionManager.foreach(_.start()) - } else { - logDebug("Block manager already in decommissioning state") - } - } - - /** - * Tries to offload all cached RDD blocks from this BlockManager to peer BlockManagers - * Visible for testing - */ - def decommissionRddCacheBlocks(): Unit = { - val replicateBlocksInfo = master.getReplicateInfoForRDDBlocks(blockManagerId) - - if (replicateBlocksInfo.nonEmpty) { - logInfo(s"Need to replicate ${replicateBlocksInfo.size} blocks " + - "for block manager decommissioning") - } - - // Maximum number of storage replication failure which replicateBlock can handle - val maxReplicationFailures = conf.get( - config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK) - - // TODO: We can sort these blocks based on some policy (LRU/blockSize etc) - // so that we end up prioritize them over each other - val blocksFailedReplication = ThreadUtils.parmap( - replicateBlocksInfo, "decommissionRddCacheBlocks", 4) { - case ReplicateBlock(blockId, existingReplicas, maxReplicas) => - val replicatedSuccessfully = replicateBlock( - blockId, - existingReplicas.toSet, - maxReplicas, - maxReplicationFailures = Some(maxReplicationFailures)) - if (replicatedSuccessfully) { - logInfo(s"Block $blockId offloaded successfully, Removing block now") - removeBlock(blockId) - logInfo(s"Block $blockId removed") - } else { - logWarning(s"Failed to offload block $blockId") - } - (blockId, replicatedSuccessfully) - }.filterNot(_._2).map(_._1) - if (blocksFailedReplication.nonEmpty) { - logWarning("Blocks failed replication in cache decommissioning " + - s"process: ${blocksFailedReplication.mkString(",")}") - } - } - /** * Remove all blocks belonging to the given broadcast. */ @@ -1895,52 +1829,7 @@ private[spark] class BlockManager( data.dispose() } - /** - * Class to handle block manager decommissioning retries - * It creates a Thread to retry offloading all RDD cache blocks - */ - private class BlockManagerDecommissionManager(conf: SparkConf) { - @volatile private var stopped = false - private val blockReplicationThread = new Thread { - override def run(): Unit = { - while (blockManagerDecommissioning && !stopped) { - try { - logDebug("Attempting to replicate all cached RDD blocks") - decommissionRddCacheBlocks() - logInfo("Attempt to replicate all cached blocks done") - val sleepInterval = conf.get( - config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL) - Thread.sleep(sleepInterval) - } catch { - case _: InterruptedException => - // no-op - case NonFatal(e) => - logError("Error occurred while trying to " + - "replicate cached RDD blocks for block manager decommissioning", e) - } - } - } - } - blockReplicationThread.setDaemon(true) - blockReplicationThread.setName("block-replication-thread") - - def start(): Unit = { - logInfo("Starting block replication thread") - blockReplicationThread.start() - } - - def stop(): Unit = { - if (!stopped) { - stopped = true - logInfo("Stopping block replication thread") - blockReplicationThread.interrupt() - blockReplicationThread.join() - } - } - } - def stop(): Unit = { - decommissionManager.foreach(_.stop()) blockTransferService.close() if (blockStoreClient ne blockTransferService) { // Closing should be idempotent, but maybe not for the NioBlockTransferService. diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 3cfa5d2..e440c1a 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -43,16 +43,6 @@ class BlockManagerMaster( logInfo("Removed " + execId + " successfully in removeExecutor") } - /** Decommission block managers corresponding to given set of executors */ - def decommissionBlockManagers(executorIds: Seq[String]): Unit = { - driverEndpoint.ask[Unit](DecommissionBlockManagers(executorIds)) - } - - /** Get Replication Info for all the RDD blocks stored in given blockManagerId */ - def getReplicateInfoForRDDBlocks(blockManagerId: BlockManagerId): Seq[ReplicateBlock] = { - driverEndpoint.askSync[Seq[ReplicateBlock]](GetReplicateInfoForRDDBlocks(blockManagerId)) - } - /** Request removal of a dead executor from the driver endpoint. * This is only called on the driver side. Non-blocking */ diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index d936420..d7f7eed 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -65,9 +65,6 @@ class BlockManagerMasterEndpoint( // Mapping from executor ID to block manager ID. private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId] - // Set of block managers which are decommissioning - private val decommissioningBlockManagerSet = new mutable.HashSet[BlockManagerId] - // Mapping from block id to the set of block managers that have the block. private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]] @@ -156,13 +153,6 @@ class BlockManagerMasterEndpoint( removeExecutor(execId) context.reply(true) - case DecommissionBlockManagers(executorIds) => - decommissionBlockManagers(executorIds.flatMap(blockManagerIdByExecutor.get)) - context.reply(true) - - case GetReplicateInfoForRDDBlocks(blockManagerId) => - context.reply(getReplicateInfoForRDDBlocks(blockManagerId)) - case StopBlockManagerMaster => context.reply(true) stop() @@ -267,7 +257,6 @@ class BlockManagerMasterEndpoint( // Remove the block manager from blockManagerIdByExecutor. blockManagerIdByExecutor -= blockManagerId.executorId - decommissioningBlockManagerSet.remove(blockManagerId) // Remove it from blockManagerInfo and remove all the blocks. blockManagerInfo.remove(blockManagerId) @@ -310,39 +299,6 @@ class BlockManagerMasterEndpoint( blockManagerIdByExecutor.get(execId).foreach(removeBlockManager) } - /** - * Decommission the given Seq of blockmanagers - * - Adds these block managers to decommissioningBlockManagerSet Set - * - Sends the DecommissionBlockManager message to each of the [[BlockManagerSlaveEndpoint]] - */ - def decommissionBlockManagers(blockManagerIds: Seq[BlockManagerId]): Future[Seq[Unit]] = { - val newBlockManagersToDecommission = blockManagerIds.toSet.diff(decommissioningBlockManagerSet) - val futures = newBlockManagersToDecommission.map { blockManagerId => - decommissioningBlockManagerSet.add(blockManagerId) - val info = blockManagerInfo(blockManagerId) - info.slaveEndpoint.ask[Unit](DecommissionBlockManager) - } - Future.sequence{ futures.toSeq } - } - - /** - * Returns a Seq of ReplicateBlock for each RDD block stored by given blockManagerId - * @param blockManagerId - block manager id for which ReplicateBlock info is needed - * @return Seq of ReplicateBlock - */ - private def getReplicateInfoForRDDBlocks(blockManagerId: BlockManagerId): Seq[ReplicateBlock] = { - val info = blockManagerInfo(blockManagerId) - - val rddBlocks = info.blocks.keySet().asScala.filter(_.isRDD) - rddBlocks.map { blockId => - val currentBlockLocations = blockLocations.get(blockId) - val maxReplicas = currentBlockLocations.size + 1 - val remainingLocations = currentBlockLocations.toSeq.filter(bm => bm != blockManagerId) - val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas) - replicateMsg - }.toSeq - } - // Remove a block from the slaves that have it. This can only be used to remove // blocks that the master knows about. private def removeBlockFromWorkers(blockId: BlockId): Unit = { @@ -580,11 +536,7 @@ class BlockManagerMasterEndpoint( private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = { val blockManagerIds = blockManagerInfo.keySet if (blockManagerIds.contains(blockManagerId)) { - blockManagerIds - .filterNot { _.isDriver } - .filterNot { _ == blockManagerId } - .diff(decommissioningBlockManagerSet) - .toSeq + blockManagerIds.filterNot { _.isDriver }.filterNot { _ == blockManagerId }.toSeq } else { Seq.empty } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala index 7d4f2ff..895f48d 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala @@ -36,8 +36,6 @@ private[spark] object BlockManagerMessages { case class ReplicateBlock(blockId: BlockId, replicas: Seq[BlockManagerId], maxReplicas: Int) extends ToBlockManagerSlave - case object DecommissionBlockManager extends ToBlockManagerSlave - // Remove all blocks belonging to a specific RDD. case class RemoveRdd(rddId: Int) extends ToBlockManagerSlave @@ -127,11 +125,6 @@ private[spark] object BlockManagerMessages { case object GetStorageStatus extends ToBlockManagerMaster - case class DecommissionBlockManagers(executorIds: Seq[String]) extends ToBlockManagerMaster - - case class GetReplicateInfoForRDDBlocks(blockManagerId: BlockManagerId) - extends ToBlockManagerMaster - case class GetBlockStatus(blockId: BlockId, askSlaves: Boolean = true) extends ToBlockManagerMaster diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala index a3a7149..29e2114 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala @@ -61,9 +61,6 @@ class BlockManagerSlaveEndpoint( SparkEnv.get.shuffleManager.unregisterShuffle(shuffleId) } - case DecommissionBlockManager => - context.reply(blockManager.decommissionBlockManager()) - case RemoveBroadcast(broadcastId, _) => doAsync[Int]("removing broadcast " + broadcastId, context) { blockManager.removeBroadcast(broadcastId, tellMaster = true) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala deleted file mode 100644 index 59fb056..0000000 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.storage - -import java.util.concurrent.Semaphore - -import scala.collection.mutable.ArrayBuffer -import scala.concurrent.duration._ - -import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite, Success} -import org.apache.spark.internal.config -import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd, SparkListenerTaskStart} -import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend -import org.apache.spark.util.ThreadUtils - -class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext { - - override def beforeEach(): Unit = { - val conf = new SparkConf().setAppName("test").setMaster("local") - .set(config.Worker.WORKER_DECOMMISSION_ENABLED, true) - .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 100L) - .set(config.STORAGE_DECOMMISSION_ENABLED, true) - - sc = new SparkContext("local-cluster[3, 1, 1024]", "test", conf) - } - - test(s"verify that an already running task which is going to cache data succeeds " + - s"on a decommissioned executor") { - // Create input RDD with 10 partitions - val input = sc.parallelize(1 to 10, 10) - val accum = sc.longAccumulator("mapperRunAccumulator") - // Do a count to wait for the executors to be registered. - input.count() - - // Create a new RDD where we have sleep in each partition, we are also increasing - // the value of accumulator in each partition - val sleepyRdd = input.mapPartitions { x => - Thread.sleep(500) - accum.add(1) - x - } - - // Listen for the job - val sem = new Semaphore(0) - val taskEndEvents = ArrayBuffer.empty[SparkListenerTaskEnd] - sc.addSparkListener(new SparkListener { - override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { - sem.release() - } - - override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { - taskEndEvents.append(taskEnd) - } - }) - - // Cache the RDD lazily - sleepyRdd.persist() - - // Start the computation of RDD - this step will also cache the RDD - val asyncCount = sleepyRdd.countAsync() - - // Wait for the job to have started - sem.acquire(1) - - // Decommission one of the executor - val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend] - val execs = sched.getExecutorIds() - assert(execs.size == 3, s"Expected 3 executors but found ${execs.size}") - val execToDecommission = execs.head - sched.decommissionExecutor(execToDecommission) - - // Wait for job to finish - val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 3.seconds) - assert(asyncCountResult === 10) - // All 10 tasks finished, so accum should have been increased 10 times - assert(accum.value === 10) - - // All tasks should be successful, nothing should have failed - sc.listenerBus.waitUntilEmpty() - assert(taskEndEvents.size === 10) // 10 mappers - assert(taskEndEvents.map(_.reason).toSet === Set(Success)) - - // Since the RDD is cached, so further usage of same RDD should use the - // cached data. Original RDD partitions should not be recomputed i.e. accum - // should have same value like before - assert(sleepyRdd.count() === 10) - assert(accum.value === 10) - } -} diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index eb875dc..8d06768 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1706,64 +1706,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE verify(liveListenerBus, never()).post(SparkListenerBlockUpdated(BlockUpdatedInfo(updateInfo))) } - test("test decommission block manager should not be part of peers") { - val exec1 = "exec1" - val exec2 = "exec2" - val exec3 = "exec3" - val store1 = makeBlockManager(2000, exec1) - val store2 = makeBlockManager(2000, exec2) - val store3 = makeBlockManager(2000, exec3) - - assert(master.getPeers(store3.blockManagerId).map(_.executorId).toSet === Set(exec1, exec2)) - - val data = new Array[Byte](400) - val blockId = rdd(0, 0) - store1.putSingle(blockId, data, StorageLevel.MEMORY_ONLY_2) - assert(master.getLocations(blockId).size === 2) - - master.decommissionBlockManagers(Seq(exec1)) - // store1 is decommissioned, so it should not be part of peer list for store3 - assert(master.getPeers(store3.blockManagerId).map(_.executorId).toSet === Set(exec2)) - } - - test("test decommissionRddCacheBlocks should offload all cached blocks") { - val store1 = makeBlockManager(2000, "exec1") - val store2 = makeBlockManager(2000, "exec2") - val store3 = makeBlockManager(2000, "exec3") - - val data = new Array[Byte](400) - val blockId = rdd(0, 0) - store1.putSingle(blockId, data, StorageLevel.MEMORY_ONLY_2) - assert(master.getLocations(blockId).size === 2) - assert(master.getLocations(blockId).contains(store1.blockManagerId)) - - store1.decommissionRddCacheBlocks() - assert(master.getLocations(blockId).size === 2) - assert(master.getLocations(blockId).toSet === Set(store2.blockManagerId, - store3.blockManagerId)) - } - - test("test decommissionRddCacheBlocks should keep the block if it is not able to offload") { - val store1 = makeBlockManager(12000, "exec1") - val store2 = makeBlockManager(2000, "exec2") - - val dataLarge = new Array[Byte](5000) - val blockIdLarge = rdd(0, 0) - val dataSmall = new Array[Byte](500) - val blockIdSmall = rdd(0, 1) - - store1.putSingle(blockIdLarge, dataLarge, StorageLevel.MEMORY_ONLY) - store1.putSingle(blockIdSmall, dataSmall, StorageLevel.MEMORY_ONLY) - assert(master.getLocations(blockIdLarge) === Seq(store1.blockManagerId)) - assert(master.getLocations(blockIdSmall) === Seq(store1.blockManagerId)) - - store1.decommissionRddCacheBlocks() - // Smaller block offloaded to store2 - assert(master.getLocations(blockIdSmall) === Seq(store2.blockManagerId)) - // Larger block still present in store1 as it can't be offloaded - assert(master.getLocations(blockIdLarge) === Seq(store1.blockManagerId)) - } - class MockBlockTransferService(val maxFailures: Int) extends BlockTransferService { var numCalls = 0 var tempFileManager: DownloadFileManager = null --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org