Repository: spark
Updated Branches:
  refs/heads/branch-1.2 5d07488ad -> 5aaf0e0ff


[SPARK-5233][Streaming] Fix error replaying of WAL introduced bug

Because of lacking of `BlockAllocationEvent` in WAL recovery, the dangled event 
will mix into the new batch, which will lead to the wrong result. Details can 
be seen in [SPARK-5233](https://issues.apache.org/jira/browse/SPARK-5233).

Author: jerryshao <saisai.s...@intel.com>

Closes #4032 from jerryshao/SPARK-5233 and squashes the following commits:

f0b0c0b [jerryshao] Further address the comments
a237c75 [jerryshao] Address the comments
e356258 [jerryshao] Fix bug in unit test
558bdc3 [jerryshao] Correctly replay the WAL log when recovering from failure

(cherry picked from commit 3c3fa632e6ba45ce536065aa1145698385301fb2)
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5aaf0e0f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5aaf0e0f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5aaf0e0f

Branch: refs/heads/branch-1.2
Commit: 5aaf0e0ff5e5082c0064f5f4065cd66a62aa72d6
Parents: 5d07488
Author: jerryshao <saisai.s...@intel.com>
Authored: Thu Jan 22 21:58:53 2015 -0800
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Thu Jan 22 21:59:22 2015 -0800

----------------------------------------------------------------------
 .../examples/streaming/KafkaWordCount.scala     |  2 +-
 .../streaming/scheduler/JobGenerator.scala      | 18 ++++++++++++------
 .../scheduler/ReceivedBlockTracker.scala        | 12 +++++++++---
 .../streaming/ReceivedBlockTrackerSuite.scala   | 20 ++++++++++----------
 4 files changed, 32 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5aaf0e0f/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
 
b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
index c9e1511..82aeaaf 100644
--- 
a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
+++ 
b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
@@ -77,7 +77,7 @@ object KafkaWordCountProducer {
 
     val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args
 
-    // Zookeper connection properties
+    // Zookeeper connection properties
     val props = new Properties()
     props.put("metadata.broker.list", brokers)
     props.put("serializer.class", "kafka.serializer.StringEncoder")

http://git-wip-us.apache.org/repos/asf/spark/blob/5aaf0e0f/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index d86f852..8632c94 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -17,12 +17,14 @@
 
 package org.apache.spark.streaming.scheduler
 
-import akka.actor.{ActorRef, ActorSystem, Props, Actor}
-import org.apache.spark.{SparkException, SparkEnv, Logging}
-import org.apache.spark.streaming.{Checkpoint, Time, CheckpointWriter}
-import org.apache.spark.streaming.util.{ManualClock, RecurringTimer, Clock}
 import scala.util.{Failure, Success, Try}
 
+import akka.actor.{ActorRef, Props, Actor}
+
+import org.apache.spark.{SparkEnv, Logging}
+import org.apache.spark.streaming.{Checkpoint, CheckpointWriter, Time}
+import org.apache.spark.streaming.util.{Clock, ManualClock, RecurringTimer}
+
 /** Event classes for JobGenerator */
 private[scheduler] sealed trait JobGeneratorEvent
 private[scheduler] case class GenerateJobs(time: Time) extends 
JobGeneratorEvent
@@ -206,9 +208,13 @@ class JobGenerator(jobScheduler: JobScheduler) extends 
Logging {
     val timesToReschedule = (pendingTimes ++ 
downTimes).distinct.sorted(Time.ordering)
     logInfo("Batches to reschedule (" + timesToReschedule.size + " batches): " 
+
       timesToReschedule.mkString(", "))
-    timesToReschedule.foreach(time =>
+    timesToReschedule.foreach { time =>
+      // Allocate the related blocks when recovering from failure, because 
some blocks that were
+      // added but not allocated, are dangling in the queue after recovering, 
we have to allocate
+      // those blocks to the next batch, which is the batch they were supposed 
to go.
+      jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate 
received blocks to batch
       jobScheduler.submitJobSet(JobSet(time, graph.generateJobs(time)))
-    )
+    }
 
     // Restart the timer
     timer.start(restartTime.milliseconds)

http://git-wip-us.apache.org/repos/asf/spark/blob/5aaf0e0f/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
index ef23b5c..e19ac93 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
@@ -67,7 +67,7 @@ private[streaming] class ReceivedBlockTracker(
   extends Logging {
 
   private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]
-  
+
   private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, 
ReceivedBlockQueue]
   private val timeToAllocatedBlocks = new mutable.HashMap[Time, 
AllocatedBlocks]
   private val logManagerOption = createLogManager()
@@ -107,8 +107,14 @@ private[streaming] class ReceivedBlockTracker(
       lastAllocatedBatchTime = batchTime
       allocatedBlocks
     } else {
-      throw new SparkException(s"Unexpected allocation of blocks, " +
-        s"last batch = $lastAllocatedBatchTime, batch time to allocate = 
$batchTime  ")
+      // This situation occurs when:
+      // 1. WAL is ended with BatchAllocationEvent, but without 
BatchCleanupEvent,
+      // possibly processed batch job or half-processed batch job need to be 
processed again,
+      // so the batchTime will be equal to lastAllocatedBatchTime.
+      // 2. Slow checkpointing makes recovered batch time older than WAL 
recovered
+      // lastAllocatedBatchTime.
+      // This situation will only occurs in recovery time.
+      logInfo(s"Possibly processed batch $batchTime need to be processed again 
in WAL recovery")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5aaf0e0f/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index de7e9d6..fbb7b0b 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -82,15 +82,15 @@ class ReceivedBlockTrackerSuite
     receivedBlockTracker.allocateBlocksToBatch(2)
     receivedBlockTracker.getBlocksOfBatchAndStream(2, streamId) shouldBe empty
 
-    // Verify that batch 2 cannot be allocated again
-    intercept[SparkException] {
-      receivedBlockTracker.allocateBlocksToBatch(2)
-    }
+    // Verify that older batches have no operation on batch allocation,
+    // will return the same blocks as previously allocated.
+    receivedBlockTracker.allocateBlocksToBatch(1)
+    receivedBlockTracker.getBlocksOfBatchAndStream(1, streamId) shouldEqual 
blockInfos
 
-    // Verify that older batches cannot be allocated again
-    intercept[SparkException] {
-      receivedBlockTracker.allocateBlocksToBatch(1)
-    }
+    blockInfos.map(receivedBlockTracker.addBlock)
+    receivedBlockTracker.allocateBlocksToBatch(2)
+    receivedBlockTracker.getBlocksOfBatchAndStream(2, streamId) shouldBe empty
+    receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual blockInfos
   }
 
   test("block addition, block to batch allocation and cleanup with write ahead 
log") {
@@ -186,14 +186,14 @@ class ReceivedBlockTrackerSuite
     tracker4.getBlocksOfBatchAndStream(batchTime1, streamId) shouldBe empty  
// should be cleaned
     tracker4.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual 
blockInfos2
   }
-  
+
   test("enabling write ahead log but not setting checkpoint dir") {
     conf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
     intercept[SparkException] {
       createTracker(setCheckpointDir = false)
     }
   }
-  
+
   test("setting checkpoint dir but not enabling write ahead log") {
     // When WAL config is not set, log manager should not be enabled
     val tracker1 = createTracker(setCheckpointDir = true)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to