otterc commented on a change in pull request #32140:
URL: https://github.com/apache/spark/pull/32140#discussion_r655046020



##########
File path: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -871,6 +1047,81 @@ final class ShuffleBlockFetcherIterator(
           "Failed to get block " + blockId + ", which is not a shuffle block", 
e)
     }
   }
+
+  /**
+   * All the below methods are used by [[PushBasedFetchHelper]] to communicate 
with the iterator
+   */
+  private[storage] def addToResultsQueue(result: FetchResult): Unit = {
+    results.put(result)
+  }
+
+  private[storage] def incrementNumBlocksToFetch(moreBlocksToFetch: Int): Unit 
= {
+    numBlocksToFetch += moreBlocksToFetch
+  }
+
+  /**
+   * Currently used by [[PushBasedFetchHelper]] to fetch fallback blocks when 
there is a fetch
+   * failure for a shuffle merged block/chunk.
+   * This is executed by the task thread when the `iterator.next()` is invoked 
and if that initiates
+   * fallback.
+   */
+  private[storage] def fetchFallbackBlocks(
+      fallbackBlocksByAddr: Iterator[(BlockManagerId, Seq[(BlockId, Long, 
Int)])]): Unit = {
+    val fallbackLocalBlocks = mutable.LinkedHashSet[(BlockId, Int)]()
+    val fallbackHostLocalBlocksByExecutor =
+      mutable.LinkedHashMap[BlockManagerId, Seq[(BlockId, Long, Int)]]()
+    val fallbackMergedLocalBlocks = mutable.LinkedHashSet[BlockId]()
+    val fallbackRemoteReqs = partitionBlocksByFetchMode(fallbackBlocksByAddr,
+      fallbackLocalBlocks, fallbackHostLocalBlocksByExecutor, 
fallbackMergedLocalBlocks)
+    // Add the remote requests into our queue in a random order
+    fetchRequests ++= Utils.randomize(fallbackRemoteReqs)
+    logInfo(s"Started ${fallbackRemoteReqs.size} fallback remote requests for 
merged")
+    // fetch all the fallback blocks that are local.
+    fetchLocalBlocks(fallbackLocalBlocks)
+    // Merged local blocks should be empty during fallback
+    assert(fallbackMergedLocalBlocks.isEmpty,
+      "There should be zero merged blocks during fallback")
+    // Some of the fallback local blocks could be host local blocks
+    fetchAllHostLocalBlocks(fallbackHostLocalBlocksByExecutor)
+  }
+
+  /**
+   * Removes all the pending shuffle chunks that are on the same host as the 
block chunk that had
+   * a fetch failure.
+   * This is executed by the task thread when the `iterator.next()` is invoked 
and if that initiates
+   * fallback.
+   *
+   * @return set of all the removed shuffle chunk Ids.
+   */
+  private[storage] def removePendingChunks(
+      failedBlockId: ShuffleBlockChunkId,
+      address: BlockManagerId): mutable.HashSet[ShuffleBlockChunkId] = {
+    val removedChunkIds = new mutable.HashSet[ShuffleBlockChunkId]()
+
+    def sameShuffleBlockChunk(block: BlockId): Boolean = {
+      val chunkId = block.asInstanceOf[ShuffleBlockChunkId]
+      chunkId.shuffleId == failedBlockId.shuffleId && chunkId.reduceId == 
failedBlockId.reduceId
+    }
+
+    def filterRequests(queue: mutable.Queue[FetchRequest]): Unit = {
+      val fetchRequestsToRemove = new mutable.Queue[FetchRequest]()
+      fetchRequestsToRemove ++= queue.dequeueAll(req => {
+        val firstBlock = req.blocks.head
+        firstBlock.blockId.isShuffleChunk && req.address.equals(address) &&
+          sameShuffleBlockChunk(firstBlock.blockId)
+      })
+      fetchRequestsToRemove.foreach(req => {
+        removedChunkIds ++= 
req.blocks.iterator.map(_.blockId.asInstanceOf[ShuffleBlockChunkId])
+      })

Review comment:
       I made this change with others.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to