Repository: spark
Updated Branches:
  refs/heads/master 23db600c9 -> aca65c63c


[SPARK-23991][DSTREAMS] Fix data loss when WAL write fails in 
allocateBlocksToBatch

When blocks tried to get allocated to a batch and WAL write fails then the 
blocks will be removed from the received block queue. This fact simply produces 
data loss because the next allocation will not find the mentioned blocks in the 
queue.

In this PR blocks will be removed from the received queue only if WAL write 
succeded.

Additional unit test.

Author: Gabor Somogyi <gabor.g.somo...@gmail.com>

Closes #21430 from gaborgsomogyi/SPARK-23991.

Change-Id: I5ead84f0233f0c95e6d9f2854ac2ff6906f6b341


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aca65c63
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aca65c63
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aca65c63

Branch: refs/heads/master
Commit: aca65c63cb12073eb193fe08998994c60acb8b58
Parents: 23db600
Author: Gabor Somogyi <gabor.g.somo...@gmail.com>
Authored: Tue May 29 20:10:59 2018 +0800
Committer: jerryshao <ss...@hortonworks.com>
Committed: Tue May 29 20:10:59 2018 +0800

----------------------------------------------------------------------
 .../scheduler/ReceivedBlockTracker.scala        |  3 +-
 .../streaming/ReceivedBlockTrackerSuite.scala   | 47 +++++++++++++++++++-
 2 files changed, 47 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/aca65c63/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
index dacff69..cf43245 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
@@ -112,10 +112,11 @@ private[streaming] class ReceivedBlockTracker(
   def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
     if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
       val streamIdToBlocks = streamIds.map { streamId =>
-          (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
+        (streamId, getReceivedBlockQueue(streamId).clone())
       }.toMap
       val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
       if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
+        streamIds.foreach(getReceivedBlockQueue(_).clear())
         timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
         lastAllocatedBatchTime = batchTime
       } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/aca65c63/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index 4fa236b..fd7e00b 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -26,10 +26,12 @@ import scala.language.{implicitConversions, postfixOps}
 import scala.util.Random
 
 import org.apache.hadoop.conf.Configuration
+import org.mockito.Matchers.any
+import org.mockito.Mockito.{doThrow, reset, spy}
 import org.scalatest.{BeforeAndAfter, Matchers}
 import org.scalatest.concurrent.Eventually._
 
-import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
+import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.internal.Logging
 import org.apache.spark.storage.StreamBlockId
 import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult
@@ -115,6 +117,47 @@ class ReceivedBlockTrackerSuite
     tracker2.stop()
   }
 
+  test("block allocation to batch should not loose blocks from received 
queue") {
+    val tracker1 = spy(createTracker())
+    tracker1.isWriteAheadLogEnabled should be (true)
+    tracker1.getUnallocatedBlocks(streamId) shouldEqual Seq.empty
+
+    // Add blocks
+    val blockInfos = generateBlockInfos()
+    blockInfos.map(tracker1.addBlock)
+    tracker1.getUnallocatedBlocks(streamId) shouldEqual blockInfos
+
+    // Try to allocate the blocks to a batch and verify that it's failing
+    // The blocks should stay in the received queue when WAL write failing
+    doThrow(new RuntimeException("Not able to write BatchAllocationEvent"))
+      .when(tracker1).writeToLog(any(classOf[BatchAllocationEvent]))
+    val errMsg = intercept[RuntimeException] {
+      tracker1.allocateBlocksToBatch(1)
+    }
+    assert(errMsg.getMessage === "Not able to write BatchAllocationEvent")
+    tracker1.getUnallocatedBlocks(streamId) shouldEqual blockInfos
+    tracker1.getBlocksOfBatch(1) shouldEqual Map.empty
+    tracker1.getBlocksOfBatchAndStream(1, streamId) shouldEqual Seq.empty
+
+    // Allocate the blocks to a batch and verify that all of them have been 
allocated
+    reset(tracker1)
+    tracker1.allocateBlocksToBatch(2)
+    tracker1.getUnallocatedBlocks(streamId) shouldEqual Seq.empty
+    tracker1.hasUnallocatedReceivedBlocks should be (false)
+    tracker1.getBlocksOfBatch(2) shouldEqual Map(streamId -> blockInfos)
+    tracker1.getBlocksOfBatchAndStream(2, streamId) shouldEqual blockInfos
+
+    tracker1.stop()
+
+    // Recover from WAL to see the correctness
+    val tracker2 = createTracker(recoverFromWriteAheadLog = true)
+    tracker2.getUnallocatedBlocks(streamId) shouldEqual Seq.empty
+    tracker2.hasUnallocatedReceivedBlocks should be (false)
+    tracker2.getBlocksOfBatch(2) shouldEqual Map(streamId -> blockInfos)
+    tracker2.getBlocksOfBatchAndStream(2, streamId) shouldEqual blockInfos
+    tracker2.stop()
+  }
+
   test("recovery and cleanup with write ahead logs") {
     val manualClock = new ManualClock
     // Set the time increment level to twice the rotation interval so that 
every increment creates
@@ -312,7 +355,7 @@ class ReceivedBlockTrackerSuite
       recoverFromWriteAheadLog: Boolean = false,
       clock: Clock = new SystemClock): ReceivedBlockTracker = {
     val cpDirOption = if (setCheckpointDir) Some(checkpointDirectory.toString) 
else None
-    val tracker = new ReceivedBlockTracker(
+    var tracker = new ReceivedBlockTracker(
       conf, hadoopConf, Seq(streamId), clock, recoverFromWriteAheadLog, 
cpDirOption)
     allReceivedBlockTrackers += tracker
     tracker


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to