[GitHub] spark pull request #21212: [SPARK-24143] filter empty blocks when convert ma...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21212 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21212: [SPARK-24143] filter empty blocks when convert ma...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/21212#discussion_r186315362 --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala --- @@ -267,28 +269,30 @@ final class ShuffleBlockFetcherIterator( // at most maxBytesInFlight in order to limit the amount of data in flight. val remoteRequests = new ArrayBuffer[FetchRequest] -// Tracks total number of blocks (including zero sized blocks) -var totalBlocks = 0 for ((address, blockInfos) <- blocksByAddress) { - totalBlocks += blockInfos.size if (address.executorId == blockManager.blockManagerId.executorId) { -// Filter out zero-sized blocks -localBlocks ++= blockInfos.filter(_._2 != 0).map(_._1) +blockInfos.find(_._2 <= 0) match { + case Some((blockId, size)) if size < 0 => +throw new BlockException(blockId, "Negative block size " + size) + case Some((blockId, size)) if size == 0 => +throw new BlockException(blockId, "Zero-sized blocks should be excluded.") --- End diff -- I think that failing with an exception here is a great idea, so thanks for adding these checks. In general, I'm in favor of adding explicit fail-fast checks for invariants like this because it can help to defend against silent corruption bugs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21212: [SPARK-24143] filter empty blocks when convert ma...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/21212#discussion_r186293064 --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala --- @@ -267,28 +269,30 @@ final class ShuffleBlockFetcherIterator( // at most maxBytesInFlight in order to limit the amount of data in flight. val remoteRequests = new ArrayBuffer[FetchRequest] -// Tracks total number of blocks (including zero sized blocks) -var totalBlocks = 0 for ((address, blockInfos) <- blocksByAddress) { - totalBlocks += blockInfos.size if (address.executorId == blockManager.blockManagerId.executorId) { -// Filter out zero-sized blocks -localBlocks ++= blockInfos.filter(_._2 != 0).map(_._1) +blockInfos.find(_._2 <= 0) match { + case Some((blockId, size)) if size < 0 => +throw new BlockException(blockId, "Negative block size " + size) + case Some((blockId, size)) if size == 0 => +throw new BlockException(blockId, "Zero-sized blocks should be excluded.") --- End diff -- Added another check for remote blocks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21212: [SPARK-24143] filter empty blocks when convert ma...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21212#discussion_r186261650 --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala --- @@ -267,28 +269,30 @@ final class ShuffleBlockFetcherIterator( // at most maxBytesInFlight in order to limit the amount of data in flight. val remoteRequests = new ArrayBuffer[FetchRequest] -// Tracks total number of blocks (including zero sized blocks) -var totalBlocks = 0 for ((address, blockInfos) <- blocksByAddress) { - totalBlocks += blockInfos.size if (address.executorId == blockManager.blockManagerId.executorId) { -// Filter out zero-sized blocks -localBlocks ++= blockInfos.filter(_._2 != 0).map(_._1) +blockInfos.find(_._2 <= 0) match { + case Some((blockId, size)) if size < 0 => +throw new BlockException(blockId, "Negative block size " + size) + case Some((blockId, size)) if size == 0 => +throw new BlockException(blockId, "Zero-sized blocks should be excluded.") --- End diff -- Is it necessary to throw exception here? If so, shall we also throw exception when detect 0-sized **remote** block rather than skip it silently? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21212: [SPARK-24143] filter empty blocks when convert ma...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/21212#discussion_r186254447 --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala --- @@ -267,28 +269,28 @@ final class ShuffleBlockFetcherIterator( // at most maxBytesInFlight in order to limit the amount of data in flight. val remoteRequests = new ArrayBuffer[FetchRequest] -// Tracks total number of blocks (including zero sized blocks) -var totalBlocks = 0 for ((address, blockInfos) <- blocksByAddress) { - totalBlocks += blockInfos.size if (address.executorId == blockManager.blockManagerId.executorId) { -// Filter out zero-sized blocks -localBlocks ++= blockInfos.filter(_._2 != 0).map(_._1) +blockInfos.find(_._2 < 0) match { --- End diff -- Yes, I added check for _._2 ==0 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21212: [SPARK-24143] filter empty blocks when convert ma...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21212#discussion_r185980680 --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala --- @@ -267,28 +269,28 @@ final class ShuffleBlockFetcherIterator( // at most maxBytesInFlight in order to limit the amount of data in flight. val remoteRequests = new ArrayBuffer[FetchRequest] -// Tracks total number of blocks (including zero sized blocks) -var totalBlocks = 0 for ((address, blockInfos) <- blocksByAddress) { - totalBlocks += blockInfos.size if (address.executorId == blockManager.blockManagerId.executorId) { -// Filter out zero-sized blocks -localBlocks ++= blockInfos.filter(_._2 != 0).map(_._1) +blockInfos.find(_._2 < 0) match { --- End diff -- shall we use `_._2 < 0` to make sure the 0-size blocks are filtered? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21212: [SPARK-24143] filter empty blocks when convert ma...
GitHub user jinxing64 opened a pull request: https://github.com/apache/spark/pull/21212 [SPARK-24143] filter empty blocks when convert mapstatus to (blockId,⦠⦠size) pair. ## What changes were proposed in this pull request? In current code(`MapOutputTracker.convertMapStatuses`), mapstatus are converted to (blockId, size) pair for all blocks â no matter the block is empty or not, which result in OOM when there are lots of consecutive empty blocks, especially when adaptive execution is enabled. (blockId, size) pair is only used in `ShuffleBlockFetcherIterator` to control shuffle-read and only non-empty block request is sent. Can we just filter out the empty blocks in MapOutputTracker.convertMapStatuses and save memory? ## How was this patch tested? not added yet. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jinxing64/spark SPARK-24143 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21212.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21212 commit 5211ebd5cc4d3de023752b8ab8168d7bda18aa83 Author: jinxingDate: 2018-05-02T05:40:34Z [SPARK-24143] filter empty blocks when convert mapstatus to (blockId, size) pair. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org