otterc commented on a change in pull request #32140:
URL: https://github.com/apache/spark/pull/32140#discussion_r648594657



##########
File path: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -1124,4 +1392,298 @@ object ShuffleBlockFetcherIterator {
    */
   private[storage]
   case class DeferFetchRequestResult(fetchRequest: FetchRequest) extends 
FetchResult
+
+  /**
+   * Result of a fetch from a remote merged block unsuccessfully.
+   * Instead of treating this as a FailureFetchResult, we ignore this failure
+   * and fallback to fetch the original unmerged blocks.
+   * @param blockId block id
+   * @param address BlockManager that the merged block was attempted to be 
fetched from
+   * @param size size of the block, used to update bytesInFlight.
+   * @param isNetworkReqDone Is this the last network request for this host in 
this fetch
+   *                         request. Used to update reqsInFlight.
+   */
+  private[storage] case class IgnoreFetchResult(blockId: BlockId,
+      address: BlockManagerId,
+      size: Long,
+      isNetworkReqDone: Boolean) extends FetchResult
+
+  /**
+   * Result of a successful fetch of meta information for a merged block.
+   *
+   * @param shuffleId        shuffle id.
+   * @param reduceId         reduce id.
+   * @param blockSize        size of each merged block.
+   * @param numChunks        number of chunks in the merged block.
+   * @param bitmaps          bitmaps for every chunk.
+   * @param address          BlockManager that the merged status was fetched 
from.
+   */
+  private[storage] case class MergedBlocksMetaFetchResult(
+      shuffleId: Int,
+      reduceId: Int,
+      blockSize: Long,
+      numChunks: Int,
+      bitmaps: Array[RoaringBitmap],
+      address: BlockManagerId,
+      blockId: BlockId = DUMMY_SHUFFLE_BLOCK_ID) extends FetchResult
+
+  /**
+   * Result of a failure while fetching the meta information for a merged 
block.
+   *
+   * @param shuffleId shuffle id.
+   * @param reduceId  reduce id.
+   * @param address   BlockManager that the merged status was fetched from.
+   */
+  private[storage] case class MergedBlocksMetaFailedFetchResult(
+      shuffleId: Int,
+      reduceId: Int,
+      address: BlockManagerId,
+      blockId: BlockId = DUMMY_SHUFFLE_BLOCK_ID) extends FetchResult
+}
+
+/**
+ * Helper class that encapsulates all the push-based functionality to fetch 
merged block meta
+ * and merged shuffle block chunks.
+ */
+private class PushBasedFetchHelper(

Review comment:
       I will work on separating this into its own file.  For making this a 
trait, do you mean that we define APIs for `PushBasedFetchHelper` so there 
could be different implementations? If yes, I am not sure it will add much 
value. Right now, this implementation and the methods in this class are very 
closely tied to the implementation of push merge shuffle. The API is not 
generic. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to