[GitHub] [spark] cloud-fan commented on a change in pull request #26930: [SPARK-30290][Core] Count for merged block when fetching continuous blocks in batch

2019-12-23 Thread GitBox
cloud-fan commented on a change in pull request #26930: [SPARK-30290][Core] 
Count for merged block when fetching continuous blocks in batch
URL: https://github.com/apache/spark/pull/26930#discussion_r361089166
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 ##
 @@ -337,14 +337,25 @@ final class ShuffleBlockFetcherIterator(
   assertPositiveBlockSize(blockId, size)
   curBlocks += FetchBlockInfo(blockId, size, mapIndex)
   curRequestSize += size
-  if (curRequestSize >= targetRemoteRequestSize ||
-  curBlocks.size >= maxBlocksInFlightPerAddress) {
+  // For batch fetch, the actual block in flight should count for merged 
block.
+  val exceedsMaxBlocksInFlightPerAddress = !doBatchFetch &&
+curBlocks.size >= maxBlocksInFlightPerAddress
+  if (curRequestSize >= targetRemoteRequestSize || 
exceedsMaxBlocksInFlightPerAddress) {
 // Add this FetchRequest
 val mergedBlocks = mergeContinuousShuffleBlockIdsIfNeeded(curBlocks)
-collectedRemoteRequests += new FetchRequest(address, mergedBlocks)
-logDebug(s"Creating fetch request of $curRequestSize at $address "
-  + s"with ${mergedBlocks.size} blocks")
+  .grouped(maxBlocksInFlightPerAddress)
 curBlocks = new ArrayBuffer[FetchBlockInfo]
+mergedBlocks.foreach { mergedBlock =>
+  if (mergedBlock.size == maxBlocksInFlightPerAddress) {
+collectedRemoteRequests += new FetchRequest(address, mergedBlock)
+logDebug(s"Creating fetch request of $curRequestSize at $address "
+  + s"with ${mergedBlock.size} blocks")
+  } else {
+// The last group might not exceed `maxBlocksInFlightPerAddress`. 
Put it back
 
 Review comment:
   when we reach here, it's not `might not exceed`, it's `does not exceed`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #26930: [SPARK-30290][Core] Count for merged block when fetching continuous blocks in batch

2019-12-22 Thread GitBox
cloud-fan commented on a change in pull request #26930: [SPARK-30290][Core] 
Count for merged block when fetching continuous blocks in batch
URL: https://github.com/apache/spark/pull/26930#discussion_r360783984
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 ##
 @@ -337,13 +337,17 @@ final class ShuffleBlockFetcherIterator(
   assertPositiveBlockSize(blockId, size)
   curBlocks += FetchBlockInfo(blockId, size, mapIndex)
   curRequestSize += size
-  if (curRequestSize >= targetRemoteRequestSize ||
-  curBlocks.size >= maxBlocksInFlightPerAddress) {
+  // For batch fetch, the actual block in flight should count for merged 
block.
+  val readyForCollectingBlocks = !doBatchFetch &&
+curBlocks.size >= maxBlocksInFlightPerAddress
+  if (curRequestSize >= targetRemoteRequestSize || 
readyForCollectingBlocks) {
 // Add this FetchRequest
 val mergedBlocks = mergeContinuousShuffleBlockIdsIfNeeded(curBlocks)
-collectedRemoteRequests += new FetchRequest(address, mergedBlocks)
-logDebug(s"Creating fetch request of $curRequestSize at $address "
-  + s"with ${mergedBlocks.size} blocks")
+mergedBlocks.grouped(maxBlocksInFlightPerAddress).foreach { 
mergedBlock =>
 
 Review comment:
   for the last group, it may not exceed either the `targetRemoteRequestSize` 
or `maxBlocksInFlightPerAddress`. Shall we put them back to the `curBlocks`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #26930: [SPARK-30290][Core] Count for merged block when fetching continuous blocks in batch

2019-12-22 Thread GitBox
cloud-fan commented on a change in pull request #26930: [SPARK-30290][Core] 
Count for merged block when fetching continuous blocks in batch
URL: https://github.com/apache/spark/pull/26930#discussion_r360782001
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 ##
 @@ -337,13 +337,17 @@ final class ShuffleBlockFetcherIterator(
   assertPositiveBlockSize(blockId, size)
   curBlocks += FetchBlockInfo(blockId, size, mapIndex)
   curRequestSize += size
-  if (curRequestSize >= targetRemoteRequestSize ||
-  curBlocks.size >= maxBlocksInFlightPerAddress) {
+  // For batch fetch, the actual block in flight should count for merged 
block.
+  val readyForCollectingBlocks = !doBatchFetch &&
 
 Review comment:
   nit: `exceedsMaxBlocksInFlightPerAddress`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org