cloud-fan commented on a change in pull request #27767: [SPARK-31017][TEST][CORE] Test for shuffle requests packaging with different size and numBlocks limit URL: https://github.com/apache/spark/pull/27767#discussion_r386901923
########## File path: core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala ########## @@ -254,6 +254,78 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT intercept[FetchFailedException] { iterator.next() } } + test("SPARK-31017: Hit maxBytesInFlight limitation before maxBlocksInFlightPerAddress") { + val blockManager = mock(classOf[BlockManager]) + val localBmId = BlockManagerId("test-client", "test-local-host", 1) + doReturn(localBmId).when(blockManager).blockManagerId + + val remoteBmId1 = BlockManagerId("test-remote-client-1", "test-remote-host1", 1) + val remoteBmId2 = BlockManagerId("test-remote-client-2", "test-remote-host2", 2) + val remoteBmId3 = BlockManagerId("test-remote-client-3", "test-remote-host2", 3) + // set maxBlocksInFlightPerAddress to Int.MaxValue, + // and there will be 3 FetchRequests after initialize and 1000 bytes per request. + val blocksByAddress = Seq( + (remoteBmId1, Seq((ShuffleBlockId(0, 1, 0), 1000L, 0))), + (remoteBmId2, Seq((ShuffleBlockId(1, 1, 0), 1000L, 0))), + (remoteBmId3, Seq((ShuffleBlockId(2, 1, 0), 1000L, 0)))).toIterator + // give empty data to make sure the sent request is aways in flight + val transfer = createMockTransfer(Map()) + val taskContext = TaskContext.empty() + val metrics = taskContext.taskMetrics.createTempShuffleReadMetrics() + new ShuffleBlockFetcherIterator( + taskContext, + transfer, + blockManager, + blocksByAddress, + (_, in) => in, + 2500L, // allow 2 FetchRequests at most at the same time + Int.MaxValue, + Int.MaxValue, // set maxBlocksInFlightPerAddress to Int.MaxValue + Int.MaxValue, + true, + false, + metrics, + true) + // only the first 2 FetchRequests can be sent, but the last one will + // hit maxBytesInFlight so it won't be sent. + verify(transfer, times(2)).fetchBlocks(any(), any(), any(), any(), any(), any()) + } + + test("SPARK-31017: Hit maxBlocksInFlightPerAddress limitation before maxBytesInFlight") { + val blockManager = mock(classOf[BlockManager]) + val localBmId = BlockManagerId("test-client", "test-local-host", 1) + doReturn(localBmId).when(blockManager).blockManagerId + + val remoteBmId = BlockManagerId("test-remote-client-1", "test-remote-host", 2) + // set maxBlocksInFlightPerAddress to 2, so there will be 2 FetchRequests after initialize. Review comment: can we move this comment to https://github.com/apache/spark/pull/27767/files#diff-d9abbfedd9c2b0aed76c94c91ec0cec2R318 ? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org