[GitHub] spark pull request #21430: [SPARK-23991][DSTREAMS] Fix data loss when WAL wr...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21430 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21430: [SPARK-23991][DSTREAMS] Fix data loss when WAL wr...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/21430#discussion_r191130186 --- Diff: streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala --- @@ -115,6 +117,50 @@ class ReceivedBlockTrackerSuite tracker2.stop() } + test("block allocation to batch should not loose blocks from received queue") { +val tracker1 = createTracker(createSpyTracker = true) +tracker1.isWriteAheadLogEnabled should be (true) +tracker1.getUnallocatedBlocks(streamId) shouldEqual Seq.empty + +// Add blocks +val blockInfos = generateBlockInfos() +blockInfos.map(tracker1.addBlock) +tracker1.getUnallocatedBlocks(streamId) shouldEqual blockInfos + +// Try to allocate the blocks to a batch and verify that it's failing +// The blocks should stay in the received queue when WAL write failing +doThrow(new RuntimeException("Not able to write BatchAllocationEvent")) + .when(tracker1).writeToLog(any(classOf[BatchAllocationEvent])) +try { --- End diff -- Changed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21430: [SPARK-23991][DSTREAMS] Fix data loss when WAL wr...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/21430#discussion_r191130212 --- Diff: streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala --- @@ -308,12 +354,16 @@ class ReceivedBlockTrackerSuite * want to control time by manually incrementing it to test log clean. */ def createTracker( + createSpyTracker: Boolean = false, setCheckpointDir: Boolean = true, recoverFromWriteAheadLog: Boolean = false, clock: Clock = new SystemClock): ReceivedBlockTracker = { val cpDirOption = if (setCheckpointDir) Some(checkpointDirectory.toString) else None -val tracker = new ReceivedBlockTracker( +var tracker = new ReceivedBlockTracker( conf, hadoopConf, Seq(streamId), clock, recoverFromWriteAheadLog, cpDirOption) +if (createSpyTracker) { + tracker = spy(tracker) --- End diff -- Changed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21430: [SPARK-23991][DSTREAMS] Fix data loss when WAL wr...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/21430#discussion_r191130178 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala --- @@ -112,10 +112,13 @@ private[streaming] class ReceivedBlockTracker( def allocateBlocksToBatch(batchTime: Time): Unit = synchronized { if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) { val streamIdToBlocks = streamIds.map { streamId => - (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true)) +(streamId, getReceivedBlockQueue(streamId).clone()) }.toMap val allocatedBlocks = AllocatedBlocks(streamIdToBlocks) if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) { +streamIds.foreach { --- End diff -- Changed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21430: [SPARK-23991][DSTREAMS] Fix data loss when WAL wr...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/21430#discussion_r190992559 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala --- @@ -112,10 +112,13 @@ private[streaming] class ReceivedBlockTracker( def allocateBlocksToBatch(batchTime: Time): Unit = synchronized { if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) { val streamIdToBlocks = streamIds.map { streamId => - (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true)) +(streamId, getReceivedBlockQueue(streamId).clone()) }.toMap val allocatedBlocks = AllocatedBlocks(streamIdToBlocks) if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) { +streamIds.foreach { --- End diff -- nit: use parentheses when using placeholders (`_`), braces otherwise. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21430: [SPARK-23991][DSTREAMS] Fix data loss when WAL wr...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/21430#discussion_r190992915 --- Diff: streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala --- @@ -115,6 +117,50 @@ class ReceivedBlockTrackerSuite tracker2.stop() } + test("block allocation to batch should not loose blocks from received queue") { +val tracker1 = createTracker(createSpyTracker = true) +tracker1.isWriteAheadLogEnabled should be (true) +tracker1.getUnallocatedBlocks(streamId) shouldEqual Seq.empty + +// Add blocks +val blockInfos = generateBlockInfos() +blockInfos.map(tracker1.addBlock) +tracker1.getUnallocatedBlocks(streamId) shouldEqual blockInfos + +// Try to allocate the blocks to a batch and verify that it's failing +// The blocks should stay in the received queue when WAL write failing +doThrow(new RuntimeException("Not able to write BatchAllocationEvent")) + .when(tracker1).writeToLog(any(classOf[BatchAllocationEvent])) +try { --- End diff -- `intercept[RuntimeException] { ... }` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21430: [SPARK-23991][DSTREAMS] Fix data loss when WAL wr...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/21430#discussion_r190993327 --- Diff: streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala --- @@ -308,12 +354,16 @@ class ReceivedBlockTrackerSuite * want to control time by manually incrementing it to test log clean. */ def createTracker( + createSpyTracker: Boolean = false, setCheckpointDir: Boolean = true, recoverFromWriteAheadLog: Boolean = false, clock: Clock = new SystemClock): ReceivedBlockTracker = { val cpDirOption = if (setCheckpointDir) Some(checkpointDirectory.toString) else None -val tracker = new ReceivedBlockTracker( +var tracker = new ReceivedBlockTracker( conf, hadoopConf, Seq(streamId), clock, recoverFromWriteAheadLog, cpDirOption) +if (createSpyTracker) { + tracker = spy(tracker) --- End diff -- Why not do this in the caller? Less code and cleaner. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21430: [SPARK-23991][DSTREAMS] Fix data loss when WAL wr...
GitHub user gaborgsomogyi opened a pull request: https://github.com/apache/spark/pull/21430 [SPARK-23991][DSTREAMS] Fix data loss when WAL write fails in allocateBlocksToBatch ## What changes were proposed in this pull request? When blocks tried to get allocated to a batch and WAL write fails then the blocks will be removed from the received block queue. This fact simply produces data loss because the next allocation will not find the mentioned blocks in the queue. In this PR blocks will be removed from the received queue only if WAL write succeded. ## How was this patch tested? Additional unit test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/gaborgsomogyi/spark SPARK-23991 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21430.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21430 commit 2d35dfacd54d747e6a4167d46234d4b3ce87529b Author: Gabor Somogyi Date: 2018-05-25T12:52:36Z [SPARK-23991][DSTREAMS] Fix data loss when WAL write fails in allocateBlocksToBatch --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org