[GitHub] spark pull request #20685: [SPARK-23524] Big local shuffle blocks should not...

2018-03-07 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20685


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20685: [SPARK-23524] Big local shuffle blocks should not...

2018-03-06 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20685#discussion_r172731294
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -583,8 +587,8 @@ object ShuffleBlockFetcherIterator {
* Result of a fetch from a remote block successfully.
* @param blockId block id
* @param address BlockManager that the block was fetched from.
-   * @param size estimated size of the block, used to calculate 
bytesInFlight.
-   * Note that this is NOT the exact bytes.
+   * @param size estimated size of the block. Note that this is NOT the 
exact bytes.
+*Size of remote block is used to calculate bytesInFlight.
--- End diff --

nit: documentation style


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20685: [SPARK-23524] Big local shuffle blocks should not...

2018-03-06 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20685#discussion_r172579121
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
 ---
@@ -352,6 +352,63 @@ class ShuffleBlockFetcherIteratorSuite extends 
SparkFunSuite with PrivateMethodT
 intercept[FetchFailedException] { iterator.next() }
   }
 
+  test("big corrupt blocks will not be retiried") {
+val corruptStream = mock(classOf[InputStream])
+when(corruptStream.read(any(), any(), any())).thenThrow(new 
IOException("corrupt"))
+val corruptBuffer = mock(classOf[ManagedBuffer])
+when(corruptBuffer.createInputStream()).thenReturn(corruptStream)
+doReturn(1L).when(corruptBuffer).size()
+
+val blockManager = mock(classOf[BlockManager])
+val localBmId = BlockManagerId("test-client", "test-client", 1)
+doReturn(localBmId).when(blockManager).blockManagerId
+
doReturn(corruptBuffer).when(blockManager).getBlockData(ShuffleBlockId(0, 0, 0))
+val localBlockLengths = Seq[Tuple2[BlockId, Long]](
+  ShuffleBlockId(0, 0, 0) -> corruptBuffer.size()
+)
+
+val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 2)
+val remoteBlockLengths = Seq[Tuple2[BlockId, Long]](
+  ShuffleBlockId(0, 1, 0) -> corruptBuffer.size()
+)
+
+val transfer = mock(classOf[BlockTransferService])
+when(transfer.fetchBlocks(any(), any(), any(), any(), any(), any()))
--- End diff --

sorry my comment was vague -- I *do* think you can use `createMockTransfer` 
here, since that helper method already exists.

I was just thinking that there may be more we could clean up -- setting up 
the local & remote BlockManager Id, creating the ShuffleIterator, etc. seems to 
have a lot of boilerplate in all the tests.  But let's not to do a pure 
refactoring to the other tests in this change.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20685: [SPARK-23524] Big local shuffle blocks should not...

2018-03-06 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20685#discussion_r172492938
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
 ---
@@ -352,6 +352,63 @@ class ShuffleBlockFetcherIteratorSuite extends 
SparkFunSuite with PrivateMethodT
 intercept[FetchFailedException] { iterator.next() }
   }
 
+  test("big corrupt blocks will not be retiried") {
+val corruptStream = mock(classOf[InputStream])
+when(corruptStream.read(any(), any(), any())).thenThrow(new 
IOException("corrupt"))
+val corruptBuffer = mock(classOf[ManagedBuffer])
+when(corruptBuffer.createInputStream()).thenReturn(corruptStream)
+doReturn(1L).when(corruptBuffer).size()
+
+val blockManager = mock(classOf[BlockManager])
+val localBmId = BlockManagerId("test-client", "test-client", 1)
+doReturn(localBmId).when(blockManager).blockManagerId
+
doReturn(corruptBuffer).when(blockManager).getBlockData(ShuffleBlockId(0, 0, 0))
+val localBlockLengths = Seq[Tuple2[BlockId, Long]](
+  ShuffleBlockId(0, 0, 0) -> corruptBuffer.size()
+)
+
+val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 2)
+val remoteBlockLengths = Seq[Tuple2[BlockId, Long]](
+  ShuffleBlockId(0, 1, 0) -> corruptBuffer.size()
+)
+
+val transfer = mock(classOf[BlockTransferService])
+when(transfer.fetchBlocks(any(), any(), any(), any(), any(), any()))
--- End diff --

Thanks a lot~ Imran, I can file another pr for the refine :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20685: [SPARK-23524] Big local shuffle blocks should not...

2018-03-06 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20685#discussion_r172492581
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
 ---
@@ -352,6 +352,63 @@ class ShuffleBlockFetcherIteratorSuite extends 
SparkFunSuite with PrivateMethodT
 intercept[FetchFailedException] { iterator.next() }
   }
 
+  test("big corrupt blocks will not be retiried") {
--- End diff --

I will refine this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20685: [SPARK-23524] Big local shuffle blocks should not...

2018-03-05 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20685#discussion_r172226910
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
 ---
@@ -352,6 +352,63 @@ class ShuffleBlockFetcherIteratorSuite extends 
SparkFunSuite with PrivateMethodT
 intercept[FetchFailedException] { iterator.next() }
   }
 
+  test("big corrupt blocks will not be retiried") {
--- End diff --

typo: retried (or maybe "retired", not sure)
though I think a better name would be "big blocks are not checked for 
corruption"


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20685: [SPARK-23524] Big local shuffle blocks should not...

2018-03-05 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20685#discussion_r172232973
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
 ---
@@ -352,6 +352,63 @@ class ShuffleBlockFetcherIteratorSuite extends 
SparkFunSuite with PrivateMethodT
 intercept[FetchFailedException] { iterator.next() }
   }
 
+  test("big corrupt blocks will not be retiried") {
+val corruptStream = mock(classOf[InputStream])
+when(corruptStream.read(any(), any(), any())).thenThrow(new 
IOException("corrupt"))
+val corruptBuffer = mock(classOf[ManagedBuffer])
+when(corruptBuffer.createInputStream()).thenReturn(corruptStream)
+doReturn(1L).when(corruptBuffer).size()
+
+val blockManager = mock(classOf[BlockManager])
+val localBmId = BlockManagerId("test-client", "test-client", 1)
+doReturn(localBmId).when(blockManager).blockManagerId
+
doReturn(corruptBuffer).when(blockManager).getBlockData(ShuffleBlockId(0, 0, 0))
+val localBlockLengths = Seq[Tuple2[BlockId, Long]](
+  ShuffleBlockId(0, 0, 0) -> corruptBuffer.size()
+)
+
+val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 2)
+val remoteBlockLengths = Seq[Tuple2[BlockId, Long]](
+  ShuffleBlockId(0, 1, 0) -> corruptBuffer.size()
+)
+
+val transfer = mock(classOf[BlockTransferService])
+when(transfer.fetchBlocks(any(), any(), any(), any(), any(), any()))
--- End diff --

you can reuse `createMockTransfer` to simplify this a little.

(actually, a bunch of this test code looks like it could be refactored 
across these tests -- but we can leave that out of this change.)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20685: [SPARK-23524] Big local shuffle blocks should not...

2018-02-27 Thread jinxing64
GitHub user jinxing64 opened a pull request:

https://github.com/apache/spark/pull/20685

[SPARK-23524] Big local shuffle blocks should not be checked for corruption.

## What changes were proposed in this pull request?

In current code, all local blocks will be checked for corruption no matter 
it's big or not.  The reasons are as below:

Size in FetchResult for local block is set to be 0 
(https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L327)
SPARK-4105 meant to only check the small blocks(sizehttps://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L420

We can fix this and avoid the OOM.

## How was this patch tested?

UT added

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jinxing64/spark SPARK-23524

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20685.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20685


commit 535916c045b123e803c0f6dbf786076045036167
Author: jx158167 
Date:   2018-02-27T09:56:38Z

[SPARK-23524] Big local shuffle blocks should not be checked for corruption.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org