Repository: spark
Updated Branches:
  refs/heads/branch-2.3 578607b30 -> 1f180cd12


[SPARK-23438][DSTREAMS] Fix DStreams data loss with WAL when driver crashes

There is a race condition introduced in SPARK-11141 which could cause data loss.
The problem is that ReceivedBlockTracker.insertAllocatedBatch function assumes 
that all blocks from streamIdToUnallocatedBlockQueues allocated to the batch 
and clears the queue.

In this PR only the allocated blocks will be removed from the queue which will 
prevent data loss.

Additional unit test + manually.

Author: Gabor Somogyi <gabor.g.somo...@gmail.com>

Closes #20620 from gaborgsomogyi/SPARK-23438.

(cherry picked from commit b308182f233b8840dfe0e6b5736d2f2746f40757)
Signed-off-by: Marcelo Vanzin <van...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1f180cd1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1f180cd1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1f180cd1

Branch: refs/heads/branch-2.3
Commit: 1f180cd121b13ecd455bee55ed2224936a2f3b2a
Parents: 578607b
Author: Gabor Somogyi <gabor.g.somo...@gmail.com>
Authored: Mon Feb 26 08:39:44 2018 -0800
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Mon Feb 26 08:55:57 2018 -0800

----------------------------------------------------------------------
 .../scheduler/ReceivedBlockTracker.scala        | 11 ++++++----
 .../streaming/ReceivedBlockTrackerSuite.scala   | 23 +++++++++++++++++++-
 2 files changed, 29 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1f180cd1/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
index 5d9a8ac..dacff69 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
@@ -193,12 +193,15 @@ private[streaming] class ReceivedBlockTracker(
       getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
     }
 
-    // Insert the recovered block-to-batch allocations and clear the queue of 
received blocks
-    // (when the blocks were originally allocated to the batch, the queue must 
have been cleared).
+    // Insert the recovered block-to-batch allocations and removes them from 
queue of
+    // received blocks.
     def insertAllocatedBatch(batchTime: Time, allocatedBlocks: 
AllocatedBlocks) {
       logTrace(s"Recovery: Inserting allocated batch for time $batchTime to " +
         s"${allocatedBlocks.streamIdToAllocatedBlocks}")
-      streamIdToUnallocatedBlockQueues.values.foreach { _.clear() }
+      allocatedBlocks.streamIdToAllocatedBlocks.foreach {
+        case (streamId, allocatedBlocksInStream) =>
+          
getReceivedBlockQueue(streamId).dequeueAll(allocatedBlocksInStream.toSet)
+      }
       timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
       lastAllocatedBatchTime = batchTime
     }
@@ -227,7 +230,7 @@ private[streaming] class ReceivedBlockTracker(
   }
 
   /** Write an update to the tracker to the write ahead log */
-  private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = {
+  private[streaming] def writeToLog(record: ReceivedBlockTrackerLogEvent): 
Boolean = {
     if (isWriteAheadLogEnabled) {
       logTrace(s"Writing record: $record")
       try {

http://git-wip-us.apache.org/repos/asf/spark/blob/1f180cd1/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index 107c3f5..4fa236b 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -33,7 +33,7 @@ import org.apache.spark.{SparkConf, SparkException, 
SparkFunSuite}
 import org.apache.spark.internal.Logging
 import org.apache.spark.storage.StreamBlockId
 import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult
-import org.apache.spark.streaming.scheduler._
+import org.apache.spark.streaming.scheduler.{AllocatedBlocks, _}
 import org.apache.spark.streaming.util._
 import org.apache.spark.streaming.util.WriteAheadLogSuite._
 import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils}
@@ -94,6 +94,27 @@ class ReceivedBlockTrackerSuite
     receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual blockInfos
   }
 
+  test("recovery with write ahead logs should remove only allocated blocks 
from received queue") {
+    val manualClock = new ManualClock
+    val batchTime = manualClock.getTimeMillis()
+
+    val tracker1 = createTracker(clock = manualClock)
+    tracker1.isWriteAheadLogEnabled should be (true)
+
+    val allocatedBlockInfos = generateBlockInfos()
+    val unallocatedBlockInfos = generateBlockInfos()
+    val receivedBlockInfos = allocatedBlockInfos ++ unallocatedBlockInfos
+    receivedBlockInfos.foreach { b => 
tracker1.writeToLog(BlockAdditionEvent(b)) }
+    val allocatedBlocks = AllocatedBlocks(Map(streamId -> allocatedBlockInfos))
+    tracker1.writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))
+    tracker1.stop()
+
+    val tracker2 = createTracker(clock = manualClock, recoverFromWriteAheadLog 
= true)
+    tracker2.getBlocksOfBatch(batchTime) shouldEqual 
allocatedBlocks.streamIdToAllocatedBlocks
+    tracker2.getUnallocatedBlocks(streamId) shouldEqual unallocatedBlockInfos
+    tracker2.stop()
+  }
+
   test("recovery and cleanup with write ahead logs") {
     val manualClock = new ManualClock
     // Set the time increment level to twice the rotation interval so that 
every increment creates


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to