ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1118028179
########## core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala: ########## @@ -210,6 +220,65 @@ class BlockManagerMasterEndpoint( case StopBlockManagerMaster => context.reply(true) stop() + + case UpdateRDDBlockTaskInfo(blockId, taskId) => + // This is to report the information that a rdd block(with `blockId`) is computed + // and cached by task(with `taskId`). And this happens right after the task finished + // computing/caching the block only when the block is not visible yet. And the rdd + // block will be marked as visible when the corresponding task finished successfully. + context.reply(updateRDDBlockTaskInfo(blockId, taskId)) + + case GetRDDBlockVisibility(blockId) => + // Get the visibility status of a specific rdd block. + context.reply(isRDDBlockVisible(blockId)) + + case UpdateRDDBlockVisibility(taskId, visible) => + // This is to report the information that whether rdd blocks computed by task(with `taskId`) + // can be turned to be visible. This is reported by DAGScheduler right after task completes. + // If the task finished successfully, rdd blocks can be turned to be visible, otherwise rdd + // blocks' visibility status won't change. + context.reply(updateRDDBlockVisibility(taskId, visible)) + } + + private def isRDDBlockVisible(blockId: RDDBlockId): Boolean = { + if (trackingCacheVisibility) { + blockLocations.containsKey(blockId) && + blockLocations.get(blockId).nonEmpty && !invisibleRDDBlocks.contains(blockId) + } else { + // Blocks should always be visible if the feature flag is disabled. + true + } + } + + private def updateRDDBlockVisibility(taskId: Long, visible: Boolean): Unit = { + if (!trackingCacheVisibility) { + // Do nothing if the feature flag is disabled. + return + } + + if (visible) { + tidToRddBlockIds.get(taskId).foreach { blockIds => + blockIds.foreach { blockId => + invisibleRDDBlocks.remove(blockId) + // Ask block managers to update the visibility status. + val msg = MarkRDDBlockAsVisible(blockId) + getLocations(blockId).flatMap(blockManagerInfo.get).foreach { managerInfo => + managerInfo.storageEndpoint.ask[Unit](msg) + } + } + } + } Review Comment: Updated, and this is the jira: https://issues.apache.org/jira/browse/SPARK-42582 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org