cloud-fan commented on a change in pull request #27767: 
[SPARK-31017][TEST][CORE] Test for shuffle requests packaging with different 
size and numBlocks limit
URL: https://github.com/apache/spark/pull/27767#discussion_r386901923
 
 

 ##########
 File path: 
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
 ##########
 @@ -254,6 +254,78 @@ class ShuffleBlockFetcherIteratorSuite extends 
SparkFunSuite with PrivateMethodT
     intercept[FetchFailedException] { iterator.next() }
   }
 
+  test("SPARK-31017: Hit maxBytesInFlight limitation before 
maxBlocksInFlightPerAddress") {
+    val blockManager = mock(classOf[BlockManager])
+    val localBmId = BlockManagerId("test-client", "test-local-host", 1)
+    doReturn(localBmId).when(blockManager).blockManagerId
+
+    val remoteBmId1 = BlockManagerId("test-remote-client-1", 
"test-remote-host1", 1)
+    val remoteBmId2 = BlockManagerId("test-remote-client-2", 
"test-remote-host2", 2)
+    val remoteBmId3 = BlockManagerId("test-remote-client-3", 
"test-remote-host2", 3)
+    // set maxBlocksInFlightPerAddress to Int.MaxValue,
+    // and there will be 3 FetchRequests after initialize and 1000 bytes per 
request.
+    val blocksByAddress = Seq(
+      (remoteBmId1, Seq((ShuffleBlockId(0, 1, 0), 1000L, 0))),
+      (remoteBmId2, Seq((ShuffleBlockId(1, 1, 0), 1000L, 0))),
+      (remoteBmId3, Seq((ShuffleBlockId(2, 1, 0), 1000L, 0)))).toIterator
+    // give empty data to make sure the sent request is aways in flight
+    val transfer = createMockTransfer(Map())
+    val taskContext = TaskContext.empty()
+    val metrics = taskContext.taskMetrics.createTempShuffleReadMetrics()
+    new ShuffleBlockFetcherIterator(
+      taskContext,
+      transfer,
+      blockManager,
+      blocksByAddress,
+      (_, in) => in,
+      2500L, // allow 2 FetchRequests at most at the same time
+      Int.MaxValue,
+      Int.MaxValue, // set maxBlocksInFlightPerAddress to Int.MaxValue
+      Int.MaxValue,
+      true,
+      false,
+      metrics,
+      true)
+    // only the first 2 FetchRequests can be sent, but the last one will
+    // hit maxBytesInFlight so it won't be sent.
+    verify(transfer, times(2)).fetchBlocks(any(), any(), any(), any(), any(), 
any())
+  }
+
+  test("SPARK-31017: Hit maxBlocksInFlightPerAddress limitation before 
maxBytesInFlight") {
+    val blockManager = mock(classOf[BlockManager])
+    val localBmId = BlockManagerId("test-client", "test-local-host", 1)
+    doReturn(localBmId).when(blockManager).blockManagerId
+
+    val remoteBmId = BlockManagerId("test-remote-client-1", 
"test-remote-host", 2)
+    // set maxBlocksInFlightPerAddress to 2, so there will be 2 FetchRequests 
after initialize.
 
 Review comment:
   can we move this comment to 
https://github.com/apache/spark/pull/27767/files#diff-d9abbfedd9c2b0aed76c94c91ec0cec2R318
 ?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to