mridulm commented on a change in pull request #33446:
URL: https://github.com/apache/spark/pull/33446#discussion_r780737343



##########
File path: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -281,6 +292,13 @@ final class ShuffleBlockFetcherIterator(
       }
     }
 
+    @inline def logFetchOnCompletionIfSlow(): Unit = {
+      if (remainingBlocks.isEmpty) {
+        logFetchIfSlow(TimeUnit.NANOSECONDS.toMillis(clock.nanoTime() - 
requestStartTime),
+          infoMap.values.map(_._1).sum, blockIds.size, address)

Review comment:
       Use `req.size` instead.

##########
File path: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -342,6 +362,7 @@ final class ShuffleBlockFetcherIterator(
               val block = BlockId(blockId)
               if (block.isShuffleChunk) {
                 remainingBlocks -= blockId
+                logFetchOnCompletionIfSlow()

Review comment:
       Same here as the other failed case.

##########
File path: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -281,6 +292,13 @@ final class ShuffleBlockFetcherIterator(
       }
     }
 
+    @inline def logFetchOnCompletionIfSlow(): Unit = {
+      if (remainingBlocks.isEmpty) {
+        logFetchIfSlow(TimeUnit.NANOSECONDS.toMillis(clock.nanoTime() - 
requestStartTime),
+          infoMap.values.map(_._1).sum, blockIds.size, address)

Review comment:
       pull use `req.size` instead.

##########
File path: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -291,6 +309,7 @@ final class ShuffleBlockFetcherIterator(
             // This needs to be released after use.
             buf.retain()
             remainingBlocks -= blockId
+            logFetchOnCompletionIfSlow()
             blockOOMRetryCounts.remove(blockId)
             results.put(new SuccessFetchResult(BlockId(blockId), 
infoMap(blockId)._2,
               address, infoMap(blockId)._1, buf, remainingBlocks.isEmpty))

Review comment:
       Make `remainingBlocks.isEmpty` into a local variable `networkReqDone` 
and use that for both `SuccessFetchResult` and whether to invoke 
`logFetchOnCompletionIfSlow` (and you can remove the `remainingBlocks.isEmpty` 
in that method)

##########
File path: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -334,6 +353,7 @@ final class ShuffleBlockFetcherIterator(
                     s"due to Netty OOM, will retry")
                 }
                 remainingBlocks -= blockId
+                logFetchOnCompletionIfSlow()

Review comment:
       If we are calling `logFetchOnCompletionIfSlow` from here, we should 
recompute what is `totalRequestSize` and `blockCount` - since fetch failed 
before all blocks were fetched.
   Essentially, we need to compute the stats based on infoMap.keys -- 
remainingBlocks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to