[spark] branch master updated: [SPARK-31259][CORE] Fix log message about fetch request size in ShuffleBlockFetcherIterator

2020-03-26 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 33f532a  [SPARK-31259][CORE] Fix log message about fetch request size 
in ShuffleBlockFetcherIterator
33f532a is described below

commit 33f532a9f201fb9c7895d685b3dce82cf042dc61
Author: yi.wu 
AuthorDate: Thu Mar 26 09:11:13 2020 -0700

[SPARK-31259][CORE] Fix log message about fetch request size in 
ShuffleBlockFetcherIterator

### What changes were proposed in this pull request?

Fix incorrect log of `cureRequestSize`.

### Why are the changes needed?

In batch mode, `curRequestSize` can be the total size of several block 
groups. And each group should have its own request size instead of using the 
total size.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

It's only affect log.

Closes #28028 from Ngone51/fix_curRequestSize.

Authored-by: yi.wu 
Signed-off-by: Dongjoon Hyun 
---
 .../apache/spark/storage/ShuffleBlockFetcherIterator.scala | 14 ++
 1 file changed, 6 insertions(+), 8 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index f1a7d88..404e055 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -329,9 +329,8 @@ final class ShuffleBlockFetcherIterator(
 
   private def createFetchRequest(
   blocks: Seq[FetchBlockInfo],
-  address: BlockManagerId,
-  curRequestSize: Long): FetchRequest = {
-logDebug(s"Creating fetch request of $curRequestSize at $address "
+  address: BlockManagerId): FetchRequest = {
+logDebug(s"Creating fetch request of ${blocks.map(_.size).sum} at $address 
"
   + s"with ${blocks.size} blocks")
 FetchRequest(address, blocks)
   }
@@ -339,17 +338,16 @@ final class ShuffleBlockFetcherIterator(
   private def createFetchRequests(
   curBlocks: Seq[FetchBlockInfo],
   address: BlockManagerId,
-  curRequestSize: Long,
   isLast: Boolean,
   collectedRemoteRequests: ArrayBuffer[FetchRequest]): Seq[FetchBlockInfo] 
= {
 val mergedBlocks = mergeContinuousShuffleBlockIdsIfNeeded(curBlocks)
 var retBlocks = Seq.empty[FetchBlockInfo]
 if (mergedBlocks.length <= maxBlocksInFlightPerAddress) {
-  collectedRemoteRequests += createFetchRequest(mergedBlocks, address, 
curRequestSize)
+  collectedRemoteRequests += createFetchRequest(mergedBlocks, address)
 } else {
   mergedBlocks.grouped(maxBlocksInFlightPerAddress).foreach { blocks =>
 if (blocks.length == maxBlocksInFlightPerAddress || isLast) {
-  collectedRemoteRequests += createFetchRequest(blocks, address, 
curRequestSize)
+  collectedRemoteRequests += createFetchRequest(blocks, address)
 } else {
   // The last group does not exceed `maxBlocksInFlightPerAddress`. Put 
it back
   // to `curBlocks`.
@@ -377,14 +375,14 @@ final class ShuffleBlockFetcherIterator(
   // For batch fetch, the actual block in flight should count for merged 
block.
   val mayExceedsMaxBlocks = !doBatchFetch && curBlocks.size >= 
maxBlocksInFlightPerAddress
   if (curRequestSize >= targetRemoteRequestSize || mayExceedsMaxBlocks) {
-curBlocks = createFetchRequests(curBlocks, address, curRequestSize, 
isLast = false,
+curBlocks = createFetchRequests(curBlocks, address, isLast = false,
   collectedRemoteRequests).to[ArrayBuffer]
 curRequestSize = curBlocks.map(_.size).sum
   }
 }
 // Add in the final request
 if (curBlocks.nonEmpty) {
-  curBlocks = createFetchRequests(curBlocks, address, curRequestSize, 
isLast = true,
+  curBlocks = createFetchRequests(curBlocks, address, isLast = true,
 collectedRemoteRequests).to[ArrayBuffer]
   curRequestSize = curBlocks.map(_.size).sum
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-31259][CORE] Fix log message about fetch request size in ShuffleBlockFetcherIterator

2020-03-26 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 33f532a  [SPARK-31259][CORE] Fix log message about fetch request size 
in ShuffleBlockFetcherIterator
33f532a is described below

commit 33f532a9f201fb9c7895d685b3dce82cf042dc61
Author: yi.wu 
AuthorDate: Thu Mar 26 09:11:13 2020 -0700

[SPARK-31259][CORE] Fix log message about fetch request size in 
ShuffleBlockFetcherIterator

### What changes were proposed in this pull request?

Fix incorrect log of `cureRequestSize`.

### Why are the changes needed?

In batch mode, `curRequestSize` can be the total size of several block 
groups. And each group should have its own request size instead of using the 
total size.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

It's only affect log.

Closes #28028 from Ngone51/fix_curRequestSize.

Authored-by: yi.wu 
Signed-off-by: Dongjoon Hyun 
---
 .../apache/spark/storage/ShuffleBlockFetcherIterator.scala | 14 ++
 1 file changed, 6 insertions(+), 8 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index f1a7d88..404e055 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -329,9 +329,8 @@ final class ShuffleBlockFetcherIterator(
 
   private def createFetchRequest(
   blocks: Seq[FetchBlockInfo],
-  address: BlockManagerId,
-  curRequestSize: Long): FetchRequest = {
-logDebug(s"Creating fetch request of $curRequestSize at $address "
+  address: BlockManagerId): FetchRequest = {
+logDebug(s"Creating fetch request of ${blocks.map(_.size).sum} at $address 
"
   + s"with ${blocks.size} blocks")
 FetchRequest(address, blocks)
   }
@@ -339,17 +338,16 @@ final class ShuffleBlockFetcherIterator(
   private def createFetchRequests(
   curBlocks: Seq[FetchBlockInfo],
   address: BlockManagerId,
-  curRequestSize: Long,
   isLast: Boolean,
   collectedRemoteRequests: ArrayBuffer[FetchRequest]): Seq[FetchBlockInfo] 
= {
 val mergedBlocks = mergeContinuousShuffleBlockIdsIfNeeded(curBlocks)
 var retBlocks = Seq.empty[FetchBlockInfo]
 if (mergedBlocks.length <= maxBlocksInFlightPerAddress) {
-  collectedRemoteRequests += createFetchRequest(mergedBlocks, address, 
curRequestSize)
+  collectedRemoteRequests += createFetchRequest(mergedBlocks, address)
 } else {
   mergedBlocks.grouped(maxBlocksInFlightPerAddress).foreach { blocks =>
 if (blocks.length == maxBlocksInFlightPerAddress || isLast) {
-  collectedRemoteRequests += createFetchRequest(blocks, address, 
curRequestSize)
+  collectedRemoteRequests += createFetchRequest(blocks, address)
 } else {
   // The last group does not exceed `maxBlocksInFlightPerAddress`. Put 
it back
   // to `curBlocks`.
@@ -377,14 +375,14 @@ final class ShuffleBlockFetcherIterator(
   // For batch fetch, the actual block in flight should count for merged 
block.
   val mayExceedsMaxBlocks = !doBatchFetch && curBlocks.size >= 
maxBlocksInFlightPerAddress
   if (curRequestSize >= targetRemoteRequestSize || mayExceedsMaxBlocks) {
-curBlocks = createFetchRequests(curBlocks, address, curRequestSize, 
isLast = false,
+curBlocks = createFetchRequests(curBlocks, address, isLast = false,
   collectedRemoteRequests).to[ArrayBuffer]
 curRequestSize = curBlocks.map(_.size).sum
   }
 }
 // Add in the final request
 if (curBlocks.nonEmpty) {
-  curBlocks = createFetchRequests(curBlocks, address, curRequestSize, 
isLast = true,
+  curBlocks = createFetchRequests(curBlocks, address, isLast = true,
 collectedRemoteRequests).to[ArrayBuffer]
   curRequestSize = curBlocks.map(_.size).sum
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org