mridulm commented on a change in pull request #33446: URL: https://github.com/apache/spark/pull/33446#discussion_r780737343
########## File path: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala ########## @@ -281,6 +292,13 @@ final class ShuffleBlockFetcherIterator( } } + @inline def logFetchOnCompletionIfSlow(): Unit = { + if (remainingBlocks.isEmpty) { + logFetchIfSlow(TimeUnit.NANOSECONDS.toMillis(clock.nanoTime() - requestStartTime), + infoMap.values.map(_._1).sum, blockIds.size, address) Review comment: Use `req.size` instead. ########## File path: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala ########## @@ -342,6 +362,7 @@ final class ShuffleBlockFetcherIterator( val block = BlockId(blockId) if (block.isShuffleChunk) { remainingBlocks -= blockId + logFetchOnCompletionIfSlow() Review comment: Same here as the other failed case. ########## File path: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala ########## @@ -281,6 +292,13 @@ final class ShuffleBlockFetcherIterator( } } + @inline def logFetchOnCompletionIfSlow(): Unit = { + if (remainingBlocks.isEmpty) { + logFetchIfSlow(TimeUnit.NANOSECONDS.toMillis(clock.nanoTime() - requestStartTime), + infoMap.values.map(_._1).sum, blockIds.size, address) Review comment: pull use `req.size` instead. ########## File path: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala ########## @@ -291,6 +309,7 @@ final class ShuffleBlockFetcherIterator( // This needs to be released after use. buf.retain() remainingBlocks -= blockId + logFetchOnCompletionIfSlow() blockOOMRetryCounts.remove(blockId) results.put(new SuccessFetchResult(BlockId(blockId), infoMap(blockId)._2, address, infoMap(blockId)._1, buf, remainingBlocks.isEmpty)) Review comment: Make `remainingBlocks.isEmpty` into a local variable `networkReqDone` and use that for both `SuccessFetchResult` and whether to invoke `logFetchOnCompletionIfSlow` (and you can remove the `remainingBlocks.isEmpty` in that method) ########## File path: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala ########## @@ -334,6 +353,7 @@ final class ShuffleBlockFetcherIterator( s"due to Netty OOM, will retry") } remainingBlocks -= blockId + logFetchOnCompletionIfSlow() Review comment: If we are calling `logFetchOnCompletionIfSlow` from here, we should recompute what is `totalRequestSize` and `blockCount` - since fetch failed before all blocks were fetched. Essentially, we need to compute the stats based on infoMap.keys -- remainingBlocks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org