otterc commented on a change in pull request #32140: URL: https://github.com/apache/spark/pull/32140#discussion_r648594657
########## File path: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala ########## @@ -1124,4 +1392,298 @@ object ShuffleBlockFetcherIterator { */ private[storage] case class DeferFetchRequestResult(fetchRequest: FetchRequest) extends FetchResult + + /** + * Result of a fetch from a remote merged block unsuccessfully. + * Instead of treating this as a FailureFetchResult, we ignore this failure + * and fallback to fetch the original unmerged blocks. + * @param blockId block id + * @param address BlockManager that the merged block was attempted to be fetched from + * @param size size of the block, used to update bytesInFlight. + * @param isNetworkReqDone Is this the last network request for this host in this fetch + * request. Used to update reqsInFlight. + */ + private[storage] case class IgnoreFetchResult(blockId: BlockId, + address: BlockManagerId, + size: Long, + isNetworkReqDone: Boolean) extends FetchResult + + /** + * Result of a successful fetch of meta information for a merged block. + * + * @param shuffleId shuffle id. + * @param reduceId reduce id. + * @param blockSize size of each merged block. + * @param numChunks number of chunks in the merged block. + * @param bitmaps bitmaps for every chunk. + * @param address BlockManager that the merged status was fetched from. + */ + private[storage] case class MergedBlocksMetaFetchResult( + shuffleId: Int, + reduceId: Int, + blockSize: Long, + numChunks: Int, + bitmaps: Array[RoaringBitmap], + address: BlockManagerId, + blockId: BlockId = DUMMY_SHUFFLE_BLOCK_ID) extends FetchResult + + /** + * Result of a failure while fetching the meta information for a merged block. + * + * @param shuffleId shuffle id. + * @param reduceId reduce id. + * @param address BlockManager that the merged status was fetched from. + */ + private[storage] case class MergedBlocksMetaFailedFetchResult( + shuffleId: Int, + reduceId: Int, + address: BlockManagerId, + blockId: BlockId = DUMMY_SHUFFLE_BLOCK_ID) extends FetchResult +} + +/** + * Helper class that encapsulates all the push-based functionality to fetch merged block meta + * and merged shuffle block chunks. + */ +private class PushBasedFetchHelper( Review comment: I will work on separating this into its own file. For making this a trait, do you mean that we define APIs for `PushBasedFetchHelper` so there could be different implementations? If yes, I am not sure it will add much value. Right now, this implementation and the methods in this class are very closely tied to the implementation of push merge shuffle. The API is not generic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org