otterc commented on a change in pull request #32140:
URL: https://github.com/apache/spark/pull/32140#discussion_r648520952



##########
File path: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -767,6 +908,43 @@ final class ShuffleBlockFetcherIterator(
             deferredFetchRequests.getOrElseUpdate(address, new 
Queue[FetchRequest]())
           defReqQueue.enqueue(request)
           result = null
+
+        case IgnoreFetchResult(blockId, address, size, isNetworkReqDone) =>
+          if (pushBasedFetchHelper.isNotExecutorOrMergedLocal(address)) {
+            numBlocksInFlightPerAddress(address) = 
numBlocksInFlightPerAddress(address) - 1
+            bytesInFlight -= size
+          }
+          if (isNetworkReqDone) {
+            reqsInFlight -= 1
+            logDebug("Number of requests in flight " + reqsInFlight)
+          }
+          numBlocksProcessed += 
pushBasedFetchHelper.initiateFallbackBlockFetchForMergedBlock(
+            blockId, address)
+          // Set result to null to trigger another iteration of the while loop 
to get either
+          // a SuccessFetchResult or a FailureFetchResult.
+          result = null
+
+        case MergedBlocksMetaFetchResult(shuffleId, reduceId, blockSize, 
numChunks, bitmaps,
+        address, _) =>
+          // The original meta request is processed so we decrease 
numBlocksToFetch by 1. We will
+          // collect new chunks request and the count of this is added to 
numBlocksToFetch in
+          // collectFetchReqsFromMergedBlocks.
+          numBlocksToFetch -= 1
+          val blocksToRequest = 
pushBasedFetchHelper.createChunkBlockInfosFromMetaResponse(
+            shuffleId, reduceId, blockSize, numChunks, bitmaps)
+          val additionalRemoteReqs = new ArrayBuffer[FetchRequest]
+          collectFetchRequests(address, blocksToRequest.toSeq, 
additionalRemoteReqs)
+          fetchRequests ++= additionalRemoteReqs
+          // Set result to null to force another iteration.
+          result = null

Review comment:
       > Hm..is it possible there's only FetchRequest(hasMergedBlocks) at the 
beginning? In that case, it seems to cause the fetching process to hang.
   
   It will not cause the fetch process to hang if there is just one 
FetchRequest with merged blocks.
   Consider this example that if there is a FetchRequest for a merged block 
`ShuffleBlock(0, -1, 0)`,
   - the iterator will send out the request to fetch the metadata for this 
block in `PushBasedFetchHelper.sendFetchMergedStatusRequest`. 
   - The iterator will wait for a response in the result queue at 
`results.take()`.
   - Once it receives a response, which is either `MergedBlocksMetaFetchResult` 
or `MergedBlocksMetaFailedFetchResult`, it adds more FetchRequests to the fetch 
queue and sets `result = null`.
   - `fetchUpToMaxBytes()` is always called after processing the response.
   - Since `result = null`, while loop repeats and waits again for a response 
in the result queue.  
   
   I will also add a UT for this case just to verify this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to