[GitHub] spark pull request #20685: [SPARK-23524] Big local shuffle blocks should not...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20685 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20685: [SPARK-23524] Big local shuffle blocks should not...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/20685#discussion_r172731294 --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala --- @@ -583,8 +587,8 @@ object ShuffleBlockFetcherIterator { * Result of a fetch from a remote block successfully. * @param blockId block id * @param address BlockManager that the block was fetched from. - * @param size estimated size of the block, used to calculate bytesInFlight. - * Note that this is NOT the exact bytes. + * @param size estimated size of the block. Note that this is NOT the exact bytes. +*Size of remote block is used to calculate bytesInFlight. --- End diff -- nit: documentation style --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20685: [SPARK-23524] Big local shuffle blocks should not...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20685#discussion_r172579121 --- Diff: core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala --- @@ -352,6 +352,63 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT intercept[FetchFailedException] { iterator.next() } } + test("big corrupt blocks will not be retiried") { +val corruptStream = mock(classOf[InputStream]) +when(corruptStream.read(any(), any(), any())).thenThrow(new IOException("corrupt")) +val corruptBuffer = mock(classOf[ManagedBuffer]) +when(corruptBuffer.createInputStream()).thenReturn(corruptStream) +doReturn(1L).when(corruptBuffer).size() + +val blockManager = mock(classOf[BlockManager]) +val localBmId = BlockManagerId("test-client", "test-client", 1) +doReturn(localBmId).when(blockManager).blockManagerId + doReturn(corruptBuffer).when(blockManager).getBlockData(ShuffleBlockId(0, 0, 0)) +val localBlockLengths = Seq[Tuple2[BlockId, Long]]( + ShuffleBlockId(0, 0, 0) -> corruptBuffer.size() +) + +val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 2) +val remoteBlockLengths = Seq[Tuple2[BlockId, Long]]( + ShuffleBlockId(0, 1, 0) -> corruptBuffer.size() +) + +val transfer = mock(classOf[BlockTransferService]) +when(transfer.fetchBlocks(any(), any(), any(), any(), any(), any())) --- End diff -- sorry my comment was vague -- I *do* think you can use `createMockTransfer` here, since that helper method already exists. I was just thinking that there may be more we could clean up -- setting up the local & remote BlockManager Id, creating the ShuffleIterator, etc. seems to have a lot of boilerplate in all the tests. But let's not to do a pure refactoring to the other tests in this change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20685: [SPARK-23524] Big local shuffle blocks should not...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/20685#discussion_r172492938 --- Diff: core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala --- @@ -352,6 +352,63 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT intercept[FetchFailedException] { iterator.next() } } + test("big corrupt blocks will not be retiried") { +val corruptStream = mock(classOf[InputStream]) +when(corruptStream.read(any(), any(), any())).thenThrow(new IOException("corrupt")) +val corruptBuffer = mock(classOf[ManagedBuffer]) +when(corruptBuffer.createInputStream()).thenReturn(corruptStream) +doReturn(1L).when(corruptBuffer).size() + +val blockManager = mock(classOf[BlockManager]) +val localBmId = BlockManagerId("test-client", "test-client", 1) +doReturn(localBmId).when(blockManager).blockManagerId + doReturn(corruptBuffer).when(blockManager).getBlockData(ShuffleBlockId(0, 0, 0)) +val localBlockLengths = Seq[Tuple2[BlockId, Long]]( + ShuffleBlockId(0, 0, 0) -> corruptBuffer.size() +) + +val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 2) +val remoteBlockLengths = Seq[Tuple2[BlockId, Long]]( + ShuffleBlockId(0, 1, 0) -> corruptBuffer.size() +) + +val transfer = mock(classOf[BlockTransferService]) +when(transfer.fetchBlocks(any(), any(), any(), any(), any(), any())) --- End diff -- Thanks a lot~ Imran, I can file another pr for the refine :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20685: [SPARK-23524] Big local shuffle blocks should not...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/20685#discussion_r172492581 --- Diff: core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala --- @@ -352,6 +352,63 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT intercept[FetchFailedException] { iterator.next() } } + test("big corrupt blocks will not be retiried") { --- End diff -- I will refine this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20685: [SPARK-23524] Big local shuffle blocks should not...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20685#discussion_r172226910 --- Diff: core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala --- @@ -352,6 +352,63 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT intercept[FetchFailedException] { iterator.next() } } + test("big corrupt blocks will not be retiried") { --- End diff -- typo: retried (or maybe "retired", not sure) though I think a better name would be "big blocks are not checked for corruption" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20685: [SPARK-23524] Big local shuffle blocks should not...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20685#discussion_r172232973 --- Diff: core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala --- @@ -352,6 +352,63 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT intercept[FetchFailedException] { iterator.next() } } + test("big corrupt blocks will not be retiried") { +val corruptStream = mock(classOf[InputStream]) +when(corruptStream.read(any(), any(), any())).thenThrow(new IOException("corrupt")) +val corruptBuffer = mock(classOf[ManagedBuffer]) +when(corruptBuffer.createInputStream()).thenReturn(corruptStream) +doReturn(1L).when(corruptBuffer).size() + +val blockManager = mock(classOf[BlockManager]) +val localBmId = BlockManagerId("test-client", "test-client", 1) +doReturn(localBmId).when(blockManager).blockManagerId + doReturn(corruptBuffer).when(blockManager).getBlockData(ShuffleBlockId(0, 0, 0)) +val localBlockLengths = Seq[Tuple2[BlockId, Long]]( + ShuffleBlockId(0, 0, 0) -> corruptBuffer.size() +) + +val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 2) +val remoteBlockLengths = Seq[Tuple2[BlockId, Long]]( + ShuffleBlockId(0, 1, 0) -> corruptBuffer.size() +) + +val transfer = mock(classOf[BlockTransferService]) +when(transfer.fetchBlocks(any(), any(), any(), any(), any(), any())) --- End diff -- you can reuse `createMockTransfer` to simplify this a little. (actually, a bunch of this test code looks like it could be refactored across these tests -- but we can leave that out of this change.) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20685: [SPARK-23524] Big local shuffle blocks should not...
GitHub user jinxing64 opened a pull request: https://github.com/apache/spark/pull/20685 [SPARK-23524] Big local shuffle blocks should not be checked for corruption. ## What changes were proposed in this pull request? In current code, all local blocks will be checked for corruption no matter it's big or not. The reasons are as below: Size in FetchResult for local block is set to be 0 (https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L327) SPARK-4105 meant to only check the small blocks(sizehttps://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L420 We can fix this and avoid the OOM. ## How was this patch tested? UT added You can merge this pull request into a Git repository by running: $ git pull https://github.com/jinxing64/spark SPARK-23524 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20685.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20685 commit 535916c045b123e803c0f6dbf786076045036167 Author: jx158167 Date: 2018-02-27T09:56:38Z [SPARK-23524] Big local shuffle blocks should not be checked for corruption. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org