ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1113157159
########## core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala: ########## @@ -399,7 +426,14 @@ private[storage] class BlockInfoManager extends Logging { try { val wrapper = new BlockInfoWrapper(newBlockInfo, lock) while (true) { - val previous = blockInfoWrappers.putIfAbsent(blockId, wrapper) + val previous = invisibleRDDBlocks.synchronized { + val res = blockInfoWrappers.putIfAbsent(blockId, wrapper) + if (res == null && trackingCacheVisibility) { + // Added to invisible blocks if it doesn't exist before. + blockId.asRDDId.foreach(invisibleRDDBlocks.add) Review Comment: Currently the way we check whether a block is visible is to check the information from both `blockInfoWrappers` and `invisibleRDDBlocks`, so we will need the synchronized block to modify the state to avoid concurrent issues (`invisibleRDDBlocks` should be updated once new item put into `blockInfoWrappers` within the synchronized block). It will be easier to have the syncrhonized block here. ``` private[spark] def isRDDBlockVisible(blockId: RDDBlockId): Boolean = { if (trackingCacheVisibility) { invisibleRDDBlocks.synchronized { blockInfoWrappers.containsKey(blockId) && !invisibleRDDBlocks.contains(blockId) } } else { // Always be visible if the feature flag is disabled. true } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org