[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1125602608 ## core/src/main/scala/org/apache/spark/internal/config/package.scala: ## @@ -2468,4 +2468,15 @@ package object config { .version("3.4.0") .booleanConf .createWithDefault(false) + + private[spark] val RDD_CACHE_VISIBILITY_TRACKING_ENABLED = +ConfigBuilder("spark.rdd.cache.visibilityTracking.enabled") + .internal() + .doc("Set to be true to enabled RDD cache block's visibility status. Once it's enabled," + +" a RDD cache block can be used only when it's marked as visible. And a RDD block will be" + +" marked as visible only when one of the tasks generating the cache block finished" + +" successfully. This is relevant in context of consistent accumulator status.") + .version("3.4.0") Review Comment: Sure, will do. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1122750718 ## core/src/main/scala/org/apache/spark/storage/BlockManager.scala: ## @@ -1325,31 +1328,74 @@ private[spark] class BlockManager( blockInfoManager.releaseAllLocksForTask(taskAttemptId) } + /** + * Retrieve the given rdd block if it exists and is visible, otherwise call the provided + * `makeIterator` method to compute the block, persist it, and return its values. + * + * @return either a BlockResult if the block was successfully cached, or an iterator if the block + * could not be cached. + */ + def getOrElseUpdateRDDBlock[T]( + taskId: Long, + blockId: RDDBlockId, + level: StorageLevel, + classTag: ClassTag[T], + makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = { +val isCacheVisible = isRDDBlockVisible(blockId) +val res = getOrElseUpdate(blockId, level, classTag, makeIterator, isCacheVisible) +if (res.isLeft && !isCacheVisible) { + // Block exists but not visible, report taskId -> blockId info to master. + master.updateRDDBlockTaskInfo(blockId, taskId) +} + +res + } + /** * Retrieve the given block if it exists, otherwise call the provided `makeIterator` method * to compute the block, persist it, and return its values. * * @return either a BlockResult if the block was successfully cached, or an iterator if the block * could not be cached. */ - def getOrElseUpdate[T]( + private def getOrElseUpdate[T]( blockId: BlockId, level: StorageLevel, classTag: ClassTag[T], - makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = { -// Attempt to read the block from local or remote storage. If it's present, then we don't need -// to go through the local-get-or-put path. -get[T](blockId)(classTag) match { - case Some(block) => -return Left(block) - case _ => -// Need to compute the block. + makeIterator: () => Iterator[T], + isCacheVisible: Boolean = true): Either[BlockResult, Iterator[T]] = { Review Comment: done, thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1122750441 ## core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala: ## @@ -399,7 +426,14 @@ private[storage] class BlockInfoManager extends Logging { try { val wrapper = new BlockInfoWrapper(newBlockInfo, lock) while (true) { -val previous = blockInfoWrappers.putIfAbsent(blockId, wrapper) +val previous = invisibleRDDBlocks.synchronized { Review Comment: done, thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1118028518 ## core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala: ## @@ -2266,6 +2270,160 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe } } + test("SPARK-41497: getOrElseUpdateRDDBlock do compute based on cache visibility statue") { +val store = makeBlockManager(8000, "executor1") +val blockId = RDDBlockId(rddId = 1, splitIndex = 1) +var computed: Boolean = false +val data = Seq(1, 2, 3) +val makeIterator = () => { + computed = true + data.iterator +} + +// Cache doesn't exist and is not visible. +assert(store.getStatus(blockId).isEmpty && !store.isRDDBlockVisible(blockId)) +val res1 = store.getOrElseUpdateRDDBlock( + 1, blockId, StorageLevel.MEMORY_ONLY, classTag[Int], makeIterator) +// Put cache successfully and reported block task info. +assert(res1.isLeft && computed) +verify(master, times(1)).updateRDDBlockTaskInfo(blockId, 1) + +// Cache exists but not visible. +computed = false +assert(store.getStatus(blockId).nonEmpty && !store.isRDDBlockVisible(blockId)) +val res2 = store.getOrElseUpdateRDDBlock( + 1, blockId, StorageLevel.MEMORY_ONLY, classTag[Int], makeIterator) +// Load cache successfully and reported block task info. +assert(res2.isLeft && computed) +verify(master, times(2)).updateRDDBlockTaskInfo(blockId, 1) Review Comment: HI @mridulm actually this is to make sure that task2blockId information will be reported only when the block is invisble as this is part of the work flow design. If this doesn't make sense, I can remove `verify` statement in a new iteration. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1118028223 ## core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala: ## @@ -81,6 +81,8 @@ class BlockManagerStorageEndpoint( case ReplicateBlock(blockId, replicas, maxReplicas) => context.reply(blockManager.replicateBlock(blockId, replicas.toSet, maxReplicas)) +case MarkRDDBlockAsVisible(blockId) => Review Comment: Of course. Updated. ## core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala: ## @@ -2266,6 +2270,160 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe } } + test("SPARK-41497: getOrElseUpdateRDDBlock do compute based on cache visibility statue") { +val store = makeBlockManager(8000, "executor1") +val blockId = RDDBlockId(rddId = 1, splitIndex = 1) +var computed: Boolean = false +val data = Seq(1, 2, 3) +val makeIterator = () => { + computed = true + data.iterator +} + +// Cache doesn't exist and is not visible. +assert(store.getStatus(blockId).isEmpty && !store.isRDDBlockVisible(blockId)) +val res1 = store.getOrElseUpdateRDDBlock( + 1, blockId, StorageLevel.MEMORY_ONLY, classTag[Int], makeIterator) +// Put cache successfully and reported block task info. +assert(res1.isLeft && computed) +verify(master, times(1)).updateRDDBlockTaskInfo(blockId, 1) + +// Cache exists but not visible. +computed = false +assert(store.getStatus(blockId).nonEmpty && !store.isRDDBlockVisible(blockId)) Review Comment: Thanks, done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1118028179 ## core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala: ## @@ -210,6 +220,65 @@ class BlockManagerMasterEndpoint( case StopBlockManagerMaster => context.reply(true) stop() + +case UpdateRDDBlockTaskInfo(blockId, taskId) => + // This is to report the information that a rdd block(with `blockId`) is computed + // and cached by task(with `taskId`). And this happens right after the task finished + // computing/caching the block only when the block is not visible yet. And the rdd + // block will be marked as visible when the corresponding task finished successfully. + context.reply(updateRDDBlockTaskInfo(blockId, taskId)) + +case GetRDDBlockVisibility(blockId) => + // Get the visibility status of a specific rdd block. + context.reply(isRDDBlockVisible(blockId)) + +case UpdateRDDBlockVisibility(taskId, visible) => + // This is to report the information that whether rdd blocks computed by task(with `taskId`) + // can be turned to be visible. This is reported by DAGScheduler right after task completes. + // If the task finished successfully, rdd blocks can be turned to be visible, otherwise rdd + // blocks' visibility status won't change. + context.reply(updateRDDBlockVisibility(taskId, visible)) + } + + private def isRDDBlockVisible(blockId: RDDBlockId): Boolean = { +if (trackingCacheVisibility) { + blockLocations.containsKey(blockId) && +blockLocations.get(blockId).nonEmpty && !invisibleRDDBlocks.contains(blockId) +} else { + // Blocks should always be visible if the feature flag is disabled. + true +} + } + + private def updateRDDBlockVisibility(taskId: Long, visible: Boolean): Unit = { +if (!trackingCacheVisibility) { + // Do nothing if the feature flag is disabled. + return +} + +if (visible) { + tidToRddBlockIds.get(taskId).foreach { blockIds => +blockIds.foreach { blockId => + invisibleRDDBlocks.remove(blockId) + // Ask block managers to update the visibility status. + val msg = MarkRDDBlockAsVisible(blockId) + getLocations(blockId).flatMap(blockManagerInfo.get).foreach { managerInfo => +managerInfo.storageEndpoint.ask[Unit](msg) + } +} + } +} Review Comment: Updated, and this is the jira: https://issues.apache.org/jira/browse/SPARK-42582 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1118028046 ## core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala: ## @@ -525,6 +562,7 @@ private[storage] class BlockInfoManager extends Logging { blockInfoWrappers.clear() readLocksByTask.clear() writeLocksByTask.clear() +invisibleRDDBlocks.clear() Review Comment: Thanks, done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1118028028 ## core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala: ## @@ -150,6 +150,12 @@ private[storage] class BlockInfoManager extends Logging { */ private[this] val blockInfoWrappers = new ConcurrentHashMap[BlockId, BlockInfoWrapper] + /** + * Record invisible rdd blocks stored in the block manager, entries will be removed when blocks + * are marked as visible or blocks are removed by [[removeBlock()]]. + */ + private[spark] val invisibleRDDBlocks = ConcurrentHashMap.newKeySet[RDDBlockId] Review Comment: Make sense. Thanks, updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1118028001 ## core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala: ## @@ -180,6 +186,27 @@ private[storage] class BlockInfoManager extends Logging { // -- + private[spark] def isRDDBlockVisible(blockId: RDDBlockId): Boolean = { +if (trackingCacheVisibility) { + invisibleRDDBlocks.synchronized { +blockInfoWrappers.containsKey(blockId) && !invisibleRDDBlocks.contains(blockId) + } +} else { + // Always be visible if the feature flag is disabled. + true +} + } + + private[spark] def tryMarkBlockAsVisible(blockId: RDDBlockId): Unit = { +if (trackingCacheVisibility) { + invisibleRDDBlocks.synchronized { +if (blockInfoWrappers.containsKey(blockId)) { + invisibleRDDBlocks.remove(blockId) Review Comment: Thanks, you are right. Updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1118027972 ## core/src/main/scala/org/apache/spark/internal/config/package.scala: ## @@ -2468,4 +2468,15 @@ package object config { .version("3.4.0") .booleanConf .createWithDefault(false) + + private[spark] val RDD_CACHE_VISIBILITY_TRACKING_ENABLED = +ConfigBuilder("spark.rdd.cache.visibilityTracking.enabled") + .internal() + .doc("Set to be true to enabled RDD cache block's visibility status. Once it's enabled," + +" a RDD cache block can be used only when it's marked as visible. And a RDD block will be" + +" marked as visible only when one of the tasks generating the cache block finished" + +" successfully. This is relevant in context of consistent accumulator status.") + .version("3.4.0") + .booleanConf + .createWithDefault(true) Review Comment: Sounds good. Thanks, updated. ## core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala: ## @@ -139,7 +139,7 @@ private[storage] object BlockInfo { * * This class is thread-safe. */ -private[storage] class BlockInfoManager extends Logging { +private[storage] class BlockInfoManager(trackingCacheVisibility: Boolean = true) extends Logging { Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1118013777 ## core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala: ## @@ -728,7 +800,22 @@ class BlockManagerMasterEndpoint( } if (storageLevel.isValid) { + val firstBlock = locations.isEmpty locations.add(blockManagerId) + + blockId.asRDDId.foreach { rddBlockId => +(trackingCacheVisibility, firstBlock) match { + case (true, true) => +// Mark as invisible for the first block. +invisibleRDDBlocks.add(rddBlockId) + case (true, false) if !invisibleRDDBlocks.contains(rddBlockId) => +// If the rdd block is already visible, ask storage manager to update the visibility +// status. +blockManagerInfo(blockManagerId).storageEndpoint + .ask[Unit](MarkRDDBlockAsVisible(rddBlockId)) Review Comment: > So ensure that the replica is also marked as visible, right ? Yes, this is one scenario. Another scenario is that once a task failed fetching the cached block from remote executor, it'll compute and cache the block again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1118005755 ## core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala: ## @@ -399,7 +426,14 @@ private[storage] class BlockInfoManager extends Logging { try { val wrapper = new BlockInfoWrapper(newBlockInfo, lock) while (true) { -val previous = blockInfoWrappers.putIfAbsent(blockId, wrapper) +val previous = invisibleRDDBlocks.synchronized { + val res = blockInfoWrappers.putIfAbsent(blockId, wrapper) + if (res == null && trackingCacheVisibility) { +// Added to invisible blocks if it doesn't exist before. +blockId.asRDDId.foreach(invisibleRDDBlocks.add) Review Comment: Currently we syncrhonized all the write operations, and also the read operations which need to check both the variables. For other read operations, since the variabled won't change, I think synchronized block for such read operations may be not necessary. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1113188852 ## core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala: ## @@ -502,7 +536,10 @@ private[storage] class BlockInfoManager extends Logging { throw new IllegalStateException( s"Task $taskAttemptId called remove() on block $blockId without a write lock") } else { -blockInfoWrappers.remove(blockId) +invisibleRDDBlocks.synchronized { + blockInfoWrappers.remove(blockId) Review Comment: Just want to make sure that state change for both `blockInfoWrappers` and `invisibleRDDBlocks` are processed in the same syncrhonized block to avoid potential concurrent issues since we depends on the state of the 2 variables to decide whether a block is visible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1113157159 ## core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala: ## @@ -399,7 +426,14 @@ private[storage] class BlockInfoManager extends Logging { try { val wrapper = new BlockInfoWrapper(newBlockInfo, lock) while (true) { -val previous = blockInfoWrappers.putIfAbsent(blockId, wrapper) +val previous = invisibleRDDBlocks.synchronized { + val res = blockInfoWrappers.putIfAbsent(blockId, wrapper) + if (res == null && trackingCacheVisibility) { +// Added to invisible blocks if it doesn't exist before. +blockId.asRDDId.foreach(invisibleRDDBlocks.add) Review Comment: Currently the way we check whether a block is visible is to check the information from both `blockInfoWrappers` and `invisibleRDDBlocks`, so we will need the synchronized block to modify the state to avoid concurrent issues (`invisibleRDDBlocks` should be updated once new item put into `blockInfoWrappers` within the synchronized block). It will be easier to have the syncrhonized block here. ``` private[spark] def isRDDBlockVisible(blockId: RDDBlockId): Boolean = { if (trackingCacheVisibility) { invisibleRDDBlocks.synchronized { blockInfoWrappers.containsKey(blockId) && !invisibleRDDBlocks.contains(blockId) } } else { // Always be visible if the feature flag is disabled. true } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1112123963 ## core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala: ## @@ -150,6 +150,12 @@ private[storage] class BlockInfoManager extends Logging { */ private[this] val blockInfoWrappers = new ConcurrentHashMap[BlockId, BlockInfoWrapper] + /** + * Record visible rdd blocks stored in the block manager, entries will be removed + * by [[removeBlock()]] + */ + private[spark] val visibleRDDBlocks = ConcurrentHashMap.newKeySet[RDDBlockId] Review Comment: Sounds good, will make the change. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1112123512 ## core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala: ## @@ -77,6 +78,11 @@ class BlockManagerMasterEndpoint( // Mapping from block id to the set of block managers that have the block. private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]] + // Mapping from task id to the set of rdd blocks which are generated from the task. + private val tidToRddBlockIds = new mutable.HashMap[Long, mutable.HashSet[RDDBlockId]] + // Record the visible RDD blocks which have been generated at least from one successful task. Review Comment: Thanks, fixed the comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1112123108 ## core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala: ## @@ -210,6 +219,65 @@ class BlockManagerMasterEndpoint( case StopBlockManagerMaster => context.reply(true) stop() + +case UpdateRDDBlockTaskInfo(blockId, taskId) => + // This is to report the information that a rdd block(with `blockId`) is computed + // and cached by task(with `taskId`). The happens right after the task finished + // computing/caching the block only when the block is not visible yet. And the rdd + // block will be marked as visible only when the corresponding task finished successfully. + context.reply(updateRDDBlockTaskInfo(blockId, taskId)) + +case GetRDDBlockVisibility(blockId) => + // Get the visibility status of a specific rdd block. + if (!trackingCacheVisibility) { Review Comment: Sure, updated. ## core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala: ## @@ -210,6 +219,65 @@ class BlockManagerMasterEndpoint( case StopBlockManagerMaster => context.reply(true) stop() + +case UpdateRDDBlockTaskInfo(blockId, taskId) => + // This is to report the information that a rdd block(with `blockId`) is computed + // and cached by task(with `taskId`). The happens right after the task finished Review Comment: Thanks, updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1112122792 ## core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala: ## @@ -2266,6 +2270,150 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe } } + test("SPARK-41497: getOrElseUpdateRDDBlock do compute based on cache visibility statue") { +val store = makeBlockManager(8000, "executor1") +val blockId = RDDBlockId(rddId = 1, splitIndex = 1) +var computed: Boolean = false +val data = Seq(1, 2, 3) +val makeIterator = () => { + computed = true + data.iterator +} + +// Cache doesn't exist and is not visible. +assert(store.getStatus(blockId).isEmpty && !store.isRDDBlockVisible(blockId)) +val res1 = store.getOrElseUpdateRDDBlock( + 1, blockId, StorageLevel.MEMORY_ONLY, classTag[Int], makeIterator) +// Put cache successfully and reported block task info. +assert(res1.isLeft && computed) +verify(master, times(1)).updateRDDBlockTaskInfo(blockId, 1) + +// Cache exists but not visible. +computed = false +assert(store.getStatus(blockId).nonEmpty && !store.isRDDBlockVisible(blockId)) +val res2 = store.getOrElseUpdateRDDBlock( + 1, blockId, StorageLevel.MEMORY_ONLY, classTag[Int], makeIterator) +// Load cache successfully and reported block task info. +assert(res2.isLeft && computed) +verify(master, times(2)).updateRDDBlockTaskInfo(blockId, 1) + +// Cache exists and visible. +store.blockInfoManager.tryAddVisibleBlock(blockId) +computed = false +assert(store.getStatus(blockId).nonEmpty && store.isRDDBlockVisible(blockId)) +val res3 = store.getOrElseUpdateRDDBlock( + 1, blockId, StorageLevel.MEMORY_ONLY, classTag[Int], makeIterator) +// Load cache successfully but not report block task info. +assert(res3.isLeft && !computed) +verify(master, times(2)).updateRDDBlockTaskInfo(blockId, 1) + } + + test("add block rdd visibility status") { Review Comment: Updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1112113795 ## core/src/main/scala/org/apache/spark/storage/BlockManager.scala: ## @@ -1424,6 +1470,28 @@ private[spark] class BlockManager( blockStoreUpdater.save() } + // Check whether a rdd block is visible or not. + private[spark] def isRDDBlockVisible(blockId: RDDBlockId): Boolean = { +// Cached blocks are always visible if the feature flag is disabled. +if (!trackingCacheVisibility) { + return true +} + +// If the rdd block visibility information not available in the block manager, +// asking master for the information. +if (blockInfoManager.isRDDBlockVisible(blockId)) { + return true +} + +if(master.isRDDBlockVisible(blockId)) { + // Cache the visibility status if block exists. + blockInfoManager.tryAddVisibleBlock(blockId) + true Review Comment: Yes, I think so. Even though current executor doesn't have the cached block, we still can read the cache from a remote executor. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r227309 ## core/src/main/scala/org/apache/spark/internal/config/package.scala: ## @@ -2468,4 +2468,15 @@ package object config { .version("3.4.0") .booleanConf .createWithDefault(false) + + private[spark] val RDD_CACHE_VISIBILITY_TRACKING_ENABLED = +ConfigBuilder("spark.rdd.cache.visibilityTracking.enabled") + .internal() + .doc("Set to be true to enabled RDD cache block's visibility status. Once it's enabled," + +" a RDD cache block can be used only when it's marked as visible. And a RDD block will be" + +" marked as visible only when one of the tasks generating the cache block finished" + +" successfully.") Review Comment: Thanks, updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r226606 ## core/src/main/scala/org/apache/spark/storage/BlockManager.scala: ## @@ -1325,14 +1325,47 @@ private[spark] class BlockManager( blockInfoManager.releaseAllLocksForTask(taskAttemptId) } + /** + * Retrieve the given rdd block if it exists and is visible, otherwise call the provided + * `makeIterator` method to compute the block, persist it, and return its values. + * + * @return either a BlockResult if the block was successfully cached, or an iterator if the block + * could not be cached. + */ + def getOrElseUpdateRDDBlock[T]( + taskId: Long, + blockId: RDDBlockId, + level: StorageLevel, + classTag: ClassTag[T], + makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = { +val isCacheVisible = isRDDBlockVisible(blockId) +var computed: Boolean = false +val getIterator = () => { + computed = true + makeIterator() +} + +val res = getOrElseUpdate(blockId, level, classTag, getIterator) +if (res.isLeft && !isCacheVisible) { + if (!computed) { +// Loaded from cache, re-compute to update accumulators. +makeIterator() + } Review Comment: Updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r226580 ## core/src/main/scala/org/apache/spark/storage/BlockManager.scala: ## @@ -1424,6 +1457,16 @@ private[spark] class BlockManager( blockStoreUpdater.save() } + // Check whether a rdd block is visible or not. + private[spark] def isRDDBlockVisible(blockId: RDDBlockId): Boolean = { +// If the rdd block visibility information not available in the block manager, +// asking master for the information. +if (blockInfoManager.isRDDBlockVisible(blockId)) { + return true +} +master.isRDDBlockVisible(blockId) Review Comment: Updated, please take a look. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r226512 ## core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala: ## @@ -77,6 +77,11 @@ class BlockManagerMasterEndpoint( // Mapping from block id to the set of block managers that have the block. private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]] + // Mapping from task id to the set of rdd blocks which are generated from the task. + private val tidToRddBlockIds = new mutable.HashMap[Long, mutable.HashSet[RDDBlockId]] + // Record the visible RDD blocks which have been generated at least from one successful task. + private val visibleRDDBlocks = new mutable.HashSet[RDDBlockId] Review Comment: Updated. Please take a look. Thanks. cc @Ngone51 @mridulm -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r226349 ## core/src/main/scala/org/apache/spark/storage/BlockManager.scala: ## @@ -1325,31 +1328,71 @@ private[spark] class BlockManager( blockInfoManager.releaseAllLocksForTask(taskAttemptId) } + /** + * Retrieve the given rdd block if it exists and is visible, otherwise call the provided + * `makeIterator` method to compute the block, persist it, and return its values. + * + * @return either a BlockResult if the block was successfully cached, or an iterator if the block + * could not be cached. + */ + def getOrElseUpdateRDDBlock[T]( + taskId: Long, + blockId: RDDBlockId, + level: StorageLevel, + classTag: ClassTag[T], + makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = { +val isCacheVisible = isRDDBlockVisible(blockId) +val res = getOrElseUpdate(blockId, level, classTag, makeIterator, isCacheVisible) +if (res.isLeft && !isCacheVisible) { + // Block exists but not visible, report taskId -> blockId info to master. + master.updateRDDBlockTaskInfo(blockId, taskId) +} + +res + } + /** * Retrieve the given block if it exists, otherwise call the provided `makeIterator` method * to compute the block, persist it, and return its values. * * @return either a BlockResult if the block was successfully cached, or an iterator if the block * could not be cached. */ - def getOrElseUpdate[T]( + private def getOrElseUpdate[T]( blockId: BlockId, level: StorageLevel, classTag: ClassTag[T], - makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = { -// Attempt to read the block from local or remote storage. If it's present, then we don't need -// to go through the local-get-or-put path. -get[T](blockId)(classTag) match { - case Some(block) => -return Left(block) - case _ => -// Need to compute the block. + makeIterator: () => Iterator[T], + isCacheVisible: Boolean = true): Either[BlockResult, Iterator[T]] = { +// Track whether the data is computed or not, force to do the computation later if need to. +// The reason we push the force computing later is that once the executor is decommissioned we +// will have a better chance to replicate the cache block because of the `checkShouldStore` +// validation when putting a new block. +var computed: Boolean = false +val iterator = () => { + computed = true + makeIterator() +} +if (isCacheVisible) { + // Attempt to read the block from local or remote storage. If it's present, then we don't need + // to go through the local-get-or-put path. + get[T](blockId)(classTag) match { +case Some(block) => + return Left(block) +case _ => + // Need to compute the block. + } } + // Initially we hold no locks on this block. -doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) match { +doPutIterator(blockId, iterator, level, classTag, keepReadLock = true) match { case None => // doPut() didn't hand work back to us, so the block already existed or was successfully // stored. Therefore, we now hold a read lock on the block. +if (!isCacheVisible && !computed) { + // Force compute to report accumulator updates. + makeIterator() Review Comment: Thanks for pointing this out. Updated and added a UT for this. ## core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala: ## @@ -1787,6 +1792,12 @@ private[spark] class DAGScheduler( case _: ExceptionFailure | _: TaskKilled => updateAccumulators(event) case _ => } +if (trackingCacheVisibility) { + // Update rdd blocks' visibility status. + blockManagerMaster.updateRDDBlockVisibility( Review Comment: Updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r226310 ## core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala: ## @@ -210,6 +219,51 @@ class BlockManagerMasterEndpoint( case StopBlockManagerMaster => context.reply(true) stop() + Review Comment: Added some comments here, please take a look. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1110773664 ## core/src/main/scala/org/apache/spark/storage/BlockManager.scala: ## @@ -1325,31 +1328,71 @@ private[spark] class BlockManager( blockInfoManager.releaseAllLocksForTask(taskAttemptId) } + /** + * Retrieve the given rdd block if it exists and is visible, otherwise call the provided + * `makeIterator` method to compute the block, persist it, and return its values. + * + * @return either a BlockResult if the block was successfully cached, or an iterator if the block + * could not be cached. + */ + def getOrElseUpdateRDDBlock[T]( + taskId: Long, + blockId: RDDBlockId, + level: StorageLevel, + classTag: ClassTag[T], + makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = { +val isCacheVisible = isRDDBlockVisible(blockId) +val res = getOrElseUpdate(blockId, level, classTag, makeIterator, isCacheVisible) +if (res.isLeft && !isCacheVisible) { + // Block exists but not visible, report taskId -> blockId info to master. + master.updateRDDBlockTaskInfo(blockId, taskId) +} + +res + } + /** * Retrieve the given block if it exists, otherwise call the provided `makeIterator` method * to compute the block, persist it, and return its values. * * @return either a BlockResult if the block was successfully cached, or an iterator if the block * could not be cached. */ - def getOrElseUpdate[T]( + private def getOrElseUpdate[T]( blockId: BlockId, level: StorageLevel, classTag: ClassTag[T], - makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = { -// Attempt to read the block from local or remote storage. If it's present, then we don't need -// to go through the local-get-or-put path. -get[T](blockId)(classTag) match { - case Some(block) => -return Left(block) - case _ => -// Need to compute the block. + makeIterator: () => Iterator[T], + isCacheVisible: Boolean = true): Either[BlockResult, Iterator[T]] = { +// Track whether the data is computed or not, force to do the computation later if need to. +// The reason we push the force computing later is that once the executor is decommissioned we +// will have a better chance to replicate the cache block because of the `checkShouldStore` +// validation when putting a new block. +var computed: Boolean = false +val iterator = () => { + computed = true + makeIterator() +} +if (isCacheVisible) { + // Attempt to read the block from local or remote storage. If it's present, then we don't need + // to go through the local-get-or-put path. + get[T](blockId)(classTag) match { +case Some(block) => + return Left(block) +case _ => + // Need to compute the block. + } } + // Initially we hold no locks on this block. -doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) match { +doPutIterator(blockId, iterator, level, classTag, keepReadLock = true) match { case None => // doPut() didn't hand work back to us, so the block already existed or was successfully // stored. Therefore, we now hold a read lock on the block. +if (!isCacheVisible && !computed) { + // Force compute to report accumulator updates. Review Comment: Yes, the recomputation is only for updating accumulators. The reulst should be the same unless the result is indeterminate. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1107261503 ## core/src/main/scala/org/apache/spark/storage/BlockManager.scala: ## @@ -1424,6 +1457,16 @@ private[spark] class BlockManager( blockStoreUpdater.save() } + // Check whether a rdd block is visible or not. + private[spark] def isRDDBlockVisible(blockId: RDDBlockId): Boolean = { +// If the rdd block visibility information not available in the block manager, +// asking master for the information. +if (blockInfoManager.isRDDBlockVisible(blockId)) { + return true +} +master.isRDDBlockVisible(blockId) Review Comment: > You are right, the current PR is handling it on a second read ... Since we are already checking for blockInfoManager.isRDDBlockVisible(blockId) first. This should cover the case of (1) - and we will always query in case block is available, and we have to distinguish (2). (2.1) would be an optimization we can attempt later on. Thanks @mridulm . Just want to clarity that are you suggesting that we can also cache the state got from driver when it's already visible? I think we can update the cache state if the block exists in the executor after getting the results from driver/master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1107228885 ## core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala: ## @@ -1787,6 +1792,12 @@ private[spark] class DAGScheduler( case _: ExceptionFailure | _: TaskKilled => updateAccumulators(event) case _ => } +if (trackingCacheVisibility) { + // Update rdd blocks' visibility status. + blockManagerMaster.updateRDDBlockVisibility( Review Comment: Yes, async call should be fine. Will make the change, thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1107226569 ## core/src/main/scala/org/apache/spark/storage/BlockManager.scala: ## @@ -1325,31 +1328,71 @@ private[spark] class BlockManager( blockInfoManager.releaseAllLocksForTask(taskAttemptId) } + /** + * Retrieve the given rdd block if it exists and is visible, otherwise call the provided + * `makeIterator` method to compute the block, persist it, and return its values. + * + * @return either a BlockResult if the block was successfully cached, or an iterator if the block + * could not be cached. + */ + def getOrElseUpdateRDDBlock[T]( + taskId: Long, + blockId: RDDBlockId, + level: StorageLevel, + classTag: ClassTag[T], + makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = { +val isCacheVisible = isRDDBlockVisible(blockId) +val res = getOrElseUpdate(blockId, level, classTag, makeIterator, isCacheVisible) +if (res.isLeft && !isCacheVisible) { + // Block exists but not visible, report taskId -> blockId info to master. + master.updateRDDBlockTaskInfo(blockId, taskId) +} + +res + } + /** * Retrieve the given block if it exists, otherwise call the provided `makeIterator` method * to compute the block, persist it, and return its values. * * @return either a BlockResult if the block was successfully cached, or an iterator if the block * could not be cached. */ - def getOrElseUpdate[T]( + private def getOrElseUpdate[T]( blockId: BlockId, level: StorageLevel, classTag: ClassTag[T], - makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = { -// Attempt to read the block from local or remote storage. If it's present, then we don't need -// to go through the local-get-or-put path. -get[T](blockId)(classTag) match { - case Some(block) => -return Left(block) - case _ => -// Need to compute the block. + makeIterator: () => Iterator[T], + isCacheVisible: Boolean = true): Either[BlockResult, Iterator[T]] = { +// Track whether the data is computed or not, force to do the computation later if need to. +// The reason we push the force computing later is that once the executor is decommissioned we +// will have a better chance to replicate the cache block because of the `checkShouldStore` +// validation when putting a new block. +var computed: Boolean = false +val iterator = () => { + computed = true + makeIterator() +} +if (isCacheVisible) { + // Attempt to read the block from local or remote storage. If it's present, then we don't need + // to go through the local-get-or-put path. + get[T](blockId)(classTag) match { +case Some(block) => + return Left(block) +case _ => + // Need to compute the block. + } } + // Initially we hold no locks on this block. -doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) match { +doPutIterator(blockId, iterator, level, classTag, keepReadLock = true) match { case None => // doPut() didn't hand work back to us, so the block already existed or was successfully // stored. Therefore, we now hold a read lock on the block. +if (!isCacheVisible && !computed) { + // Force compute to report accumulator updates. Review Comment: Yes, if the block already exists in the node, here just computed it again but not putting the result. > Could we force put the result iterator even if the block exists in this case? This will need to modify the locking mechanism a little bit. This is also a issue about indeterminate operation, for derterminate operations no need to replace the cache here. I am wondering can the inderterminate framework cover this case with some other solutions? ## core/src/main/scala/org/apache/spark/storage/BlockManager.scala: ## @@ -1325,31 +1328,71 @@ private[spark] class BlockManager( blockInfoManager.releaseAllLocksForTask(taskAttemptId) } + /** + * Retrieve the given rdd block if it exists and is visible, otherwise call the provided + * `makeIterator` method to compute the block, persist it, and return its values. + * + * @return either a BlockResult if the block was successfully cached, or an iterator if the block + * could not be cached. + */ + def getOrElseUpdateRDDBlock[T]( + taskId: Long, + blockId: RDDBlockId, + level: StorageLevel, + classTag: ClassTag[T], + makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = { +val isCacheVisible = isRDDBlockVisible(blockId) +val res = getOrElseUpdate(blockId, level, classTag, makeIterator, isCacheVisible) +if (res.isLeft && !isCacheVisible) { + // Block exists but not visible,
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1107215072 ## core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala: ## @@ -77,6 +77,11 @@ class BlockManagerMasterEndpoint( // Mapping from block id to the set of block managers that have the block. private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]] + // Mapping from task id to the set of rdd blocks which are generated from the task. + private val tidToRddBlockIds = new mutable.HashMap[Long, mutable.HashSet[RDDBlockId]] + // Record the visible RDD blocks which have been generated at least from one successful task. + private val visibleRDDBlocks = new mutable.HashSet[RDDBlockId] Review Comment: Will try to make the change, the scenario I mentioned above would not be a common case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1103799711 ## core/src/main/scala/org/apache/spark/storage/BlockManager.scala: ## @@ -1424,6 +1457,16 @@ private[spark] class BlockManager( blockStoreUpdater.save() } + // Check whether a rdd block is visible or not. + private[spark] def isRDDBlockVisible(blockId: RDDBlockId): Boolean = { +// If the rdd block visibility information not available in the block manager, +// asking master for the information. +if (blockInfoManager.isRDDBlockVisible(blockId)) { + return true +} +master.isRDDBlockVisible(blockId) Review Comment: Hi @mridulm, in current implementation, once a block turns to be visible, driver would send a broadcast message to executors having the cached block data stored to mark the block as visible. The state `visibleRDDBlocks` is cached in [BlockerInfoManager](https://github.com/apache/spark/pull/39459/files#diff-fdee2ef66ad5bea5323506395b453145c74f47c8da092dcacd34a66190a20a15). It is kind of cached visiblity state in executor side but only in executors which have the cached block stored. This is done in a push-based update style. With above mechanism, do you think we still need another cache to store the visiblity information in executor or do we also need to cache the state in executors not having the cached block data stored? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1103794439 ## core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala: ## @@ -77,6 +77,11 @@ class BlockManagerMasterEndpoint( // Mapping from block id to the set of block managers that have the block. private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]] + // Mapping from task id to the set of rdd blocks which are generated from the task. + private val tidToRddBlockIds = new mutable.HashMap[Long, mutable.HashSet[RDDBlockId]] + // Record the visible RDD blocks which have been generated at least from one successful task. + private val visibleRDDBlocks = new mutable.HashSet[RDDBlockId] Review Comment: > If existing blocks are lost - why would you need that information as they are gone? In other words, how is it different from today's situation (without visibility) - if a block is lost, it is no longer in system. Here is an example for the scenario I am trying to describe: 1. we have a cached block rdd_1_1 which has been successfully cached and marked as visible. 2. the cached block got lost due to executor lost; 3. another task on rdd1 got submitted and the 1st attempt failed after putting the cache block rdd_1_1, for the 2nd attempts, things could be different here: a. if we still have the visiblily status, the 2nd attempt can use the cached block directly; b otherwise, we still need to do the computing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1100333545 ## core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala: ## @@ -77,6 +77,11 @@ class BlockManagerMasterEndpoint( // Mapping from block id to the set of block managers that have the block. private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]] + // Mapping from task id to the set of rdd blocks which are generated from the task. + private val tidToRddBlockIds = new mutable.HashMap[Long, mutable.HashSet[RDDBlockId]] + // Record the visible RDD blocks which have been generated at least from one successful task. + private val visibleRDDBlocks = new mutable.HashSet[RDDBlockId] Review Comment: Let me explain this further. If we track visible blocks, it's clear that we always know which blocks are visible. If we track invisible blocks, the way we consider a block as visible is that at least one block exists and it's not in invisible lists. So if the existing blocks got lost, we will lose the information. Next time the cache is re-computed, we will do this again(firstly put it into invisible lists, then promote it to visible by removing it from invisible list once task finished successfully). And after doing the process again, the cache would be visible then. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1100268004 ## core/src/main/scala/org/apache/spark/storage/BlockManager.scala: ## @@ -1424,6 +1457,16 @@ private[spark] class BlockManager( blockStoreUpdater.save() } + // Check whether a rdd block is visible or not. + private[spark] def isRDDBlockVisible(blockId: RDDBlockId): Boolean = { +// If the rdd block visibility information not available in the block manager, +// asking master for the information. +if (blockInfoManager.isRDDBlockVisible(blockId)) { + return true +} +master.isRDDBlockVisible(blockId) Review Comment: > > Once the rdd is removed, a broadcast message will be sent to each BlockManager to clean the cache. > > I think the problem in this way is that it introduces the race condition between the visible cache and the broadcast message. > > `master.isRDDBlockVisible(blockId)` shouldn't be frequently called, right? I thought `blockInfoManager.isRDDBlockVisible(blockId)` would be the most case for us, no? With locality scheduler I think `master.isRDDBlockVisible(blockId) ` should not be frequently called. If the assumption is true, then maybe we don't need to cache the results. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1096527082 ## core/src/main/scala/org/apache/spark/storage/BlockManager.scala: ## @@ -1325,14 +1325,47 @@ private[spark] class BlockManager( blockInfoManager.releaseAllLocksForTask(taskAttemptId) } + /** + * Retrieve the given rdd block if it exists and is visible, otherwise call the provided + * `makeIterator` method to compute the block, persist it, and return its values. + * + * @return either a BlockResult if the block was successfully cached, or an iterator if the block + * could not be cached. + */ + def getOrElseUpdateRDDBlock[T]( + taskId: Long, + blockId: RDDBlockId, + level: StorageLevel, + classTag: ClassTag[T], + makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = { +val isCacheVisible = isRDDBlockVisible(blockId) +var computed: Boolean = false +val getIterator = () => { + computed = true + makeIterator() +} + +val res = getOrElseUpdate(blockId, level, classTag, getIterator) +if (res.isLeft && !isCacheVisible) { + if (!computed) { +// Loaded from cache, re-compute to update accumulators. +makeIterator() + } + // Block exists and not visible, report taskId -> blockId info to master. + master.updateRDDBlockTaskInfo(blockId, taskId) +} + +res + } + /** * Retrieve the given block if it exists, otherwise call the provided `makeIterator` method * to compute the block, persist it, and return its values. * * @return either a BlockResult if the block was successfully cached, or an iterator if the block * could not be cached. */ - def getOrElseUpdate[T]( + private[spark] def getOrElseUpdate[T]( Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1096527017 ## core/src/main/scala/org/apache/spark/storage/BlockManager.scala: ## @@ -1325,14 +1325,47 @@ private[spark] class BlockManager( blockInfoManager.releaseAllLocksForTask(taskAttemptId) } + /** + * Retrieve the given rdd block if it exists and is visible, otherwise call the provided + * `makeIterator` method to compute the block, persist it, and return its values. + * + * @return either a BlockResult if the block was successfully cached, or an iterator if the block + * could not be cached. + */ + def getOrElseUpdateRDDBlock[T]( + taskId: Long, + blockId: RDDBlockId, + level: StorageLevel, + classTag: ClassTag[T], + makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = { +val isCacheVisible = isRDDBlockVisible(blockId) +var computed: Boolean = false +val getIterator = () => { + computed = true + makeIterator() +} + +val res = getOrElseUpdate(blockId, level, classTag, getIterator) +if (res.isLeft && !isCacheVisible) { Review Comment: Updated. ## core/src/test/scala/org/apache/spark/AccumulatorSuite.scala: ## @@ -89,6 +89,38 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex assert(AccumulatorContext.get(10).isEmpty) } + + test("SPARK-41497: accumulators should be reported in the case of task retry with rdd cache") { +// Set up a cluster with 2 executors +val conf = new SparkConf() + .setMaster("local-cluster[2, 1, 1024]").setAppName("TaskSchedulerImplSuite") Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1096526943 ## core/src/main/scala/org/apache/spark/storage/BlockManager.scala: ## @@ -1325,14 +1325,47 @@ private[spark] class BlockManager( blockInfoManager.releaseAllLocksForTask(taskAttemptId) } + /** + * Retrieve the given rdd block if it exists and is visible, otherwise call the provided + * `makeIterator` method to compute the block, persist it, and return its values. + * + * @return either a BlockResult if the block was successfully cached, or an iterator if the block + * could not be cached. + */ + def getOrElseUpdateRDDBlock[T]( + taskId: Long, + blockId: RDDBlockId, + level: StorageLevel, + classTag: ClassTag[T], + makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = { +val isCacheVisible = isRDDBlockVisible(blockId) +var computed: Boolean = false +val getIterator = () => { + computed = true + makeIterator() +} + +val res = getOrElseUpdate(blockId, level, classTag, getIterator) +if (res.isLeft && !isCacheVisible) { + if (!computed) { +// Loaded from cache, re-compute to update accumulators. +makeIterator() + } Review Comment: > Right..in this case, the rdd block locations for different data can be attached to the same rdd block id. So the reader could get the different data for the same rdd block, which makes the rdd block data also indeterminate. @Ngone51 do you think shall we solve this issue in this PR? This looks like a more general issue about the indeterminate computation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1096526714 ## core/src/main/scala/org/apache/spark/storage/BlockManager.scala: ## @@ -1424,6 +1457,16 @@ private[spark] class BlockManager( blockStoreUpdater.save() } + // Check whether a rdd block is visible or not. + private[spark] def isRDDBlockVisible(blockId: RDDBlockId): Boolean = { +// If the rdd block visibility information not available in the block manager, +// asking master for the information. +if (blockInfoManager.isRDDBlockVisible(blockId)) { + return true +} +master.isRDDBlockVisible(blockId) Review Comment: One way in my mind is that, we cache the results(for visible rdd blocks) in the block manager. Once the rdd is removed, a broadcast message will be sent to each BlockManager to clean the cache. I am wondering is it worth to do this? Since there is locality scheduling, if tasks got scheduled to the executor where cached block exists, there'll be no calls to master. Let me know your thoughts about this, thanks. cc @mridulm @Ngone51 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1096520993 ## core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala: ## @@ -77,6 +77,11 @@ class BlockManagerMasterEndpoint( // Mapping from block id to the set of block managers that have the block. private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]] + // Mapping from task id to the set of rdd blocks which are generated from the task. + private val tidToRddBlockIds = new mutable.HashMap[Long, mutable.HashSet[RDDBlockId]] + // Record the visible RDD blocks which have been generated at least from one successful task. + private val visibleRDDBlocks = new mutable.HashSet[RDDBlockId] Review Comment: Just found one problem if we track the invisible RDD blocks. If we track the invisible RDD blocks, then we would mark a RDD block as visible(cache can be used) only when it exists in `blockLocations` and not exists in `invisibleRDDBlocks`. When `blockLocations` removed the block(could be caused by executor lost), we will lose the information. Then the new cached data won't be leveraged as soon as possible(right after the cache is generated/reported to master). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1090773361 ## core/src/main/scala/org/apache/spark/storage/BlockManager.scala: ## @@ -1325,14 +1325,47 @@ private[spark] class BlockManager( blockInfoManager.releaseAllLocksForTask(taskAttemptId) } + /** + * Retrieve the given rdd block if it exists and is visible, otherwise call the provided + * `makeIterator` method to compute the block, persist it, and return its values. + * + * @return either a BlockResult if the block was successfully cached, or an iterator if the block + * could not be cached. + */ + def getOrElseUpdateRDDBlock[T]( + taskId: Long, + blockId: RDDBlockId, + level: StorageLevel, + classTag: ClassTag[T], + makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = { +val isCacheVisible = isRDDBlockVisible(blockId) +var computed: Boolean = false +val getIterator = () => { + computed = true + makeIterator() +} + +val res = getOrElseUpdate(blockId, level, classTag, getIterator) +if (res.isLeft && !isCacheVisible) { + if (!computed) { +// Loaded from cache, re-compute to update accumulators. +makeIterator() + } Review Comment: > Particularly for unordered or indeterminate computation, this can impact the accumulators generated - and it wont match the result iterator returned. There could be some other issues with such scenarios, like if we have blocks generated from different task attempts or cache re-compute, each cache may also have different data. Would this also be a problem? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1090491731 ## core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala: ## @@ -77,6 +77,11 @@ class BlockManagerMasterEndpoint( // Mapping from block id to the set of block managers that have the block. private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]] + // Mapping from task id to the set of rdd blocks which are generated from the task. + private val tidToRddBlockIds = new mutable.HashMap[Long, mutable.HashSet[RDDBlockId]] + // Record the visible RDD blocks which have been generated at least from one successful task. + private val visibleRDDBlocks = new mutable.HashSet[RDDBlockId] Review Comment: That would be better. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1090483687 ## core/src/main/scala/org/apache/spark/storage/BlockManager.scala: ## @@ -1424,6 +1457,16 @@ private[spark] class BlockManager( blockStoreUpdater.save() } + // Check whether a rdd block is visible or not. + private[spark] def isRDDBlockVisible(blockId: RDDBlockId): Boolean = { +// If the rdd block visibility information not available in the block manager, +// asking master for the information. +if (blockInfoManager.isRDDBlockVisible(blockId)) { + return true +} +master.isRDDBlockVisible(blockId) Review Comment: Yes, I've thought about this. Then we will add a mechanism to clean the cache. Let me re-think about this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1090483687 ## core/src/main/scala/org/apache/spark/storage/BlockManager.scala: ## @@ -1424,6 +1457,16 @@ private[spark] class BlockManager( blockStoreUpdater.save() } + // Check whether a rdd block is visible or not. + private[spark] def isRDDBlockVisible(blockId: RDDBlockId): Boolean = { +// If the rdd block visibility information not available in the block manager, +// asking master for the information. +if (blockInfoManager.isRDDBlockVisible(blockId)) { + return true +} +master.isRDDBlockVisible(blockId) Review Comment: Yes, I've thought about this. Then we will add a mechanism to clean the cache then. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1090482420 ## core/src/main/scala/org/apache/spark/storage/BlockManager.scala: ## @@ -1325,14 +1325,47 @@ private[spark] class BlockManager( blockInfoManager.releaseAllLocksForTask(taskAttemptId) } + /** + * Retrieve the given rdd block if it exists and is visible, otherwise call the provided + * `makeIterator` method to compute the block, persist it, and return its values. + * + * @return either a BlockResult if the block was successfully cached, or an iterator if the block + * could not be cached. + */ + def getOrElseUpdateRDDBlock[T]( + taskId: Long, + blockId: RDDBlockId, + level: StorageLevel, + classTag: ClassTag[T], + makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = { +val isCacheVisible = isRDDBlockVisible(blockId) +var computed: Boolean = false +val getIterator = () => { + computed = true + makeIterator() +} + +val res = getOrElseUpdate(blockId, level, classTag, getIterator) +if (res.isLeft && !isCacheVisible) { + if (!computed) { +// Loaded from cache, re-compute to update accumulators. +makeIterator() + } Review Comment: Thanks, will make the change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1090480637 ## core/src/main/scala/org/apache/spark/storage/BlockManager.scala: ## @@ -1325,14 +1325,47 @@ private[spark] class BlockManager( blockInfoManager.releaseAllLocksForTask(taskAttemptId) } + /** + * Retrieve the given rdd block if it exists and is visible, otherwise call the provided + * `makeIterator` method to compute the block, persist it, and return its values. + * + * @return either a BlockResult if the block was successfully cached, or an iterator if the block + * could not be cached. + */ + def getOrElseUpdateRDDBlock[T]( + taskId: Long, + blockId: RDDBlockId, + level: StorageLevel, + classTag: ClassTag[T], + makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = { +val isCacheVisible = isRDDBlockVisible(blockId) +var computed: Boolean = false +val getIterator = () => { + computed = true + makeIterator() +} + +val res = getOrElseUpdate(blockId, level, classTag, getIterator) +if (res.isLeft && !isCacheVisible) { + if (!computed) { +// Loaded from cache, re-compute to update accumulators. +makeIterator() + } + // Block exists and not visible, report taskId -> blockId info to master. + master.updateRDDBlockTaskInfo(blockId, taskId) +} + +res + } + /** * Retrieve the given block if it exists, otherwise call the provided `makeIterator` method * to compute the block, persist it, and return its values. * * @return either a BlockResult if the block was successfully cached, or an iterator if the block * could not be cached. */ - def getOrElseUpdate[T]( + private[spark] def getOrElseUpdate[T]( Review Comment: Sounds more reasonable. Will make the change. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1090479561 ## core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala: ## @@ -180,6 +186,14 @@ private[storage] class BlockInfoManager extends Logging { // -- + private[spark] def isRDDBlockVisible(blockId: RDDBlockId): Boolean = { +blockInfoWrappers.containsKey(blockId) && visibleRDDBlocks.contains(blockId) + } Review Comment: Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1090478629 ## core/src/test/scala/org/apache/spark/AccumulatorSuite.scala: ## @@ -89,6 +89,38 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex assert(AccumulatorContext.get(10).isEmpty) } + + test("SPARK-41497: accumulators should be reported in the case of task retry with rdd cache") { +// Set up a cluster with 2 executors +val conf = new SparkConf() + .setMaster("local-cluster[2, 1, 1024]").setAppName("TaskSchedulerImplSuite") Review Comment: Thanks, will update the app name. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1090477930 ## core/src/main/scala/org/apache/spark/storage/BlockManager.scala: ## @@ -1325,14 +1325,47 @@ private[spark] class BlockManager( blockInfoManager.releaseAllLocksForTask(taskAttemptId) } + /** + * Retrieve the given rdd block if it exists and is visible, otherwise call the provided + * `makeIterator` method to compute the block, persist it, and return its values. + * + * @return either a BlockResult if the block was successfully cached, or an iterator if the block + * could not be cached. + */ + def getOrElseUpdateRDDBlock[T]( + taskId: Long, + blockId: RDDBlockId, + level: StorageLevel, + classTag: ClassTag[T], + makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = { +val isCacheVisible = isRDDBlockVisible(blockId) +var computed: Boolean = false +val getIterator = () => { + computed = true + makeIterator() +} + +val res = getOrElseUpdate(blockId, level, classTag, getIterator) +if (res.isLeft && !isCacheVisible) { Review Comment: Yes, that's true. Will make the change. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1070632750 ## core/src/main/scala/org/apache/spark/storage/BlockManager.scala: ## @@ -1325,6 +1325,64 @@ private[spark] class BlockManager( blockInfoManager.releaseAllLocksForTask(taskAttemptId) } + /** + * Retrieve the given rdd block if it exists and is visible, otherwise call the provided + * `makeIterator` method to compute the block, persist it, and return its values. + * + * @return either a BlockResult if the block was successfully cached, or an iterator if the block + * could not be cached. + */ + def getOrElseUpdateRDDBlock[T]( + taskId: Long, + blockId: RDDBlockId, + level: StorageLevel, + classTag: ClassTag[T], + makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = { +var getIterator = makeIterator + +// Attempt to read the block from local or remote storage. If it's present, then we don't need +// to go through the local-get-or-put path. +if (master.isRDDBlockVisible(blockId)) { // Read from cache only when the block is visible. + get[T](blockId)(classTag) match { +case Some(block) => + return Left(block) +case _ => +// Need to compute the block. + } + // Initially we hold no locks on this block. +} else { + // Need to compute the block, since the block maybe already exists, force + // compute the block here. + val iterator = makeIterator() + getIterator = () => iterator +} + +doPutIterator(blockId, getIterator, level, classTag, keepReadLock = true) match { + case None => +// Report taskId -> blockId relationship to master. +master.updateRDDBlockTaskInfo(blockId, taskId) Review Comment: In the latest implementation, the replication logic keeps the same as before. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1070632692 ## core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala: ## @@ -76,6 +76,8 @@ class BlockManagerMasterEndpoint( // Mapping from block id to the set of block managers that have the block. private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]] + private val visibleRDDBlocks = new mutable.HashSet[RDDBlockId] + private val tidToRddBlockIds = new mutable.HashMap[Long, mutable.HashSet[RDDBlockId]] Review Comment: Thanks, fixed. When task completes, will clean up the records. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org