[spark] branch master updated: [SPARK-31259][CORE] Fix log message about fetch request size in ShuffleBlockFetcherIterator
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 33f532a [SPARK-31259][CORE] Fix log message about fetch request size in ShuffleBlockFetcherIterator 33f532a is described below commit 33f532a9f201fb9c7895d685b3dce82cf042dc61 Author: yi.wu AuthorDate: Thu Mar 26 09:11:13 2020 -0700 [SPARK-31259][CORE] Fix log message about fetch request size in ShuffleBlockFetcherIterator ### What changes were proposed in this pull request? Fix incorrect log of `cureRequestSize`. ### Why are the changes needed? In batch mode, `curRequestSize` can be the total size of several block groups. And each group should have its own request size instead of using the total size. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? It's only affect log. Closes #28028 from Ngone51/fix_curRequestSize. Authored-by: yi.wu Signed-off-by: Dongjoon Hyun --- .../apache/spark/storage/ShuffleBlockFetcherIterator.scala | 14 ++ 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index f1a7d88..404e055 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -329,9 +329,8 @@ final class ShuffleBlockFetcherIterator( private def createFetchRequest( blocks: Seq[FetchBlockInfo], - address: BlockManagerId, - curRequestSize: Long): FetchRequest = { -logDebug(s"Creating fetch request of $curRequestSize at $address " + address: BlockManagerId): FetchRequest = { +logDebug(s"Creating fetch request of ${blocks.map(_.size).sum} at $address " + s"with ${blocks.size} blocks") FetchRequest(address, blocks) } @@ -339,17 +338,16 @@ final class ShuffleBlockFetcherIterator( private def createFetchRequests( curBlocks: Seq[FetchBlockInfo], address: BlockManagerId, - curRequestSize: Long, isLast: Boolean, collectedRemoteRequests: ArrayBuffer[FetchRequest]): Seq[FetchBlockInfo] = { val mergedBlocks = mergeContinuousShuffleBlockIdsIfNeeded(curBlocks) var retBlocks = Seq.empty[FetchBlockInfo] if (mergedBlocks.length <= maxBlocksInFlightPerAddress) { - collectedRemoteRequests += createFetchRequest(mergedBlocks, address, curRequestSize) + collectedRemoteRequests += createFetchRequest(mergedBlocks, address) } else { mergedBlocks.grouped(maxBlocksInFlightPerAddress).foreach { blocks => if (blocks.length == maxBlocksInFlightPerAddress || isLast) { - collectedRemoteRequests += createFetchRequest(blocks, address, curRequestSize) + collectedRemoteRequests += createFetchRequest(blocks, address) } else { // The last group does not exceed `maxBlocksInFlightPerAddress`. Put it back // to `curBlocks`. @@ -377,14 +375,14 @@ final class ShuffleBlockFetcherIterator( // For batch fetch, the actual block in flight should count for merged block. val mayExceedsMaxBlocks = !doBatchFetch && curBlocks.size >= maxBlocksInFlightPerAddress if (curRequestSize >= targetRemoteRequestSize || mayExceedsMaxBlocks) { -curBlocks = createFetchRequests(curBlocks, address, curRequestSize, isLast = false, +curBlocks = createFetchRequests(curBlocks, address, isLast = false, collectedRemoteRequests).to[ArrayBuffer] curRequestSize = curBlocks.map(_.size).sum } } // Add in the final request if (curBlocks.nonEmpty) { - curBlocks = createFetchRequests(curBlocks, address, curRequestSize, isLast = true, + curBlocks = createFetchRequests(curBlocks, address, isLast = true, collectedRemoteRequests).to[ArrayBuffer] curRequestSize = curBlocks.map(_.size).sum } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-31259][CORE] Fix log message about fetch request size in ShuffleBlockFetcherIterator
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 33f532a [SPARK-31259][CORE] Fix log message about fetch request size in ShuffleBlockFetcherIterator 33f532a is described below commit 33f532a9f201fb9c7895d685b3dce82cf042dc61 Author: yi.wu AuthorDate: Thu Mar 26 09:11:13 2020 -0700 [SPARK-31259][CORE] Fix log message about fetch request size in ShuffleBlockFetcherIterator ### What changes were proposed in this pull request? Fix incorrect log of `cureRequestSize`. ### Why are the changes needed? In batch mode, `curRequestSize` can be the total size of several block groups. And each group should have its own request size instead of using the total size. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? It's only affect log. Closes #28028 from Ngone51/fix_curRequestSize. Authored-by: yi.wu Signed-off-by: Dongjoon Hyun --- .../apache/spark/storage/ShuffleBlockFetcherIterator.scala | 14 ++ 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index f1a7d88..404e055 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -329,9 +329,8 @@ final class ShuffleBlockFetcherIterator( private def createFetchRequest( blocks: Seq[FetchBlockInfo], - address: BlockManagerId, - curRequestSize: Long): FetchRequest = { -logDebug(s"Creating fetch request of $curRequestSize at $address " + address: BlockManagerId): FetchRequest = { +logDebug(s"Creating fetch request of ${blocks.map(_.size).sum} at $address " + s"with ${blocks.size} blocks") FetchRequest(address, blocks) } @@ -339,17 +338,16 @@ final class ShuffleBlockFetcherIterator( private def createFetchRequests( curBlocks: Seq[FetchBlockInfo], address: BlockManagerId, - curRequestSize: Long, isLast: Boolean, collectedRemoteRequests: ArrayBuffer[FetchRequest]): Seq[FetchBlockInfo] = { val mergedBlocks = mergeContinuousShuffleBlockIdsIfNeeded(curBlocks) var retBlocks = Seq.empty[FetchBlockInfo] if (mergedBlocks.length <= maxBlocksInFlightPerAddress) { - collectedRemoteRequests += createFetchRequest(mergedBlocks, address, curRequestSize) + collectedRemoteRequests += createFetchRequest(mergedBlocks, address) } else { mergedBlocks.grouped(maxBlocksInFlightPerAddress).foreach { blocks => if (blocks.length == maxBlocksInFlightPerAddress || isLast) { - collectedRemoteRequests += createFetchRequest(blocks, address, curRequestSize) + collectedRemoteRequests += createFetchRequest(blocks, address) } else { // The last group does not exceed `maxBlocksInFlightPerAddress`. Put it back // to `curBlocks`. @@ -377,14 +375,14 @@ final class ShuffleBlockFetcherIterator( // For batch fetch, the actual block in flight should count for merged block. val mayExceedsMaxBlocks = !doBatchFetch && curBlocks.size >= maxBlocksInFlightPerAddress if (curRequestSize >= targetRemoteRequestSize || mayExceedsMaxBlocks) { -curBlocks = createFetchRequests(curBlocks, address, curRequestSize, isLast = false, +curBlocks = createFetchRequests(curBlocks, address, isLast = false, collectedRemoteRequests).to[ArrayBuffer] curRequestSize = curBlocks.map(_.size).sum } } // Add in the final request if (curBlocks.nonEmpty) { - curBlocks = createFetchRequests(curBlocks, address, curRequestSize, isLast = true, + curBlocks = createFetchRequests(curBlocks, address, isLast = true, collectedRemoteRequests).to[ArrayBuffer] curRequestSize = curBlocks.map(_.size).sum } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org