viirya commented on a change in pull request #26930: [SPARK-30290][Core] Count for merged block when fetching continuous blocks in batch URL: https://github.com/apache/spark/pull/26930#discussion_r361082828
########## File path: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala ########## @@ -337,13 +337,17 @@ final class ShuffleBlockFetcherIterator( assertPositiveBlockSize(blockId, size) curBlocks += FetchBlockInfo(blockId, size, mapIndex) curRequestSize += size - if (curRequestSize >= targetRemoteRequestSize || - curBlocks.size >= maxBlocksInFlightPerAddress) { + // For batch fetch, the actual block in flight should count for merged block. + val readyForCollectingBlocks = !doBatchFetch && + curBlocks.size >= maxBlocksInFlightPerAddress + if (curRequestSize >= targetRemoteRequestSize || readyForCollectingBlocks) { // Add this FetchRequest val mergedBlocks = mergeContinuousShuffleBlockIdsIfNeeded(curBlocks) - collectedRemoteRequests += new FetchRequest(address, mergedBlocks) - logDebug(s"Creating fetch request of $curRequestSize at $address " - + s"with ${mergedBlocks.size} blocks") + mergedBlocks.grouped(maxBlocksInFlightPerAddress).foreach { mergedBlock => Review comment: Yea, it makes sense. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org