Repository: spark
Updated Branches:
  refs/heads/branch-1.3 1723f0591 -> 03e263f5b


[SPARK-6222][Streaming] Dont delete checkpoint data when doing pre-batch-start 
checkpoint

This is another alternative approach to 
https://github.com/apache/spark/pull/4964/
I think this is a simpler fix that can be backported easily to other branches 
(1.2 and 1.3).

All it does it introduce a flag so that the pre-batch-start checkpoint does not 
call clear checkpoint.

There is not unit test yet. I will add it when this approach is commented upon. 
Not sure if this is testable easily.

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #5008 from tdas/SPARK-6222 and squashes the following commits:

7315bc2 [Tathagata Das] Removed empty line.
c438de4 [Tathagata Das] Revert unnecessary change.
5e98374 [Tathagata Das] Added unit test
50cb60b [Tathagata Das] Fixed style issue
295ca5c [Tathagata Das] Fixing SPARK-6222

(cherry picked from commit 645cf3fcc21987417b2946bdeeeb60af3edf667e)
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/03e263f5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/03e263f5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/03e263f5

Branch: refs/heads/branch-1.3
Commit: 03e263f5b527cf574f4ffcd5cd886f7723e3756e
Parents: 1723f05
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Thu Mar 19 02:15:50 2015 -0400
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Thu Mar 19 02:16:07 2015 -0400

----------------------------------------------------------------------
 .../org/apache/spark/streaming/Checkpoint.scala |  12 +-
 .../streaming/scheduler/JobGenerator.scala      |  20 +--
 .../streaming/scheduler/JobGeneratorSuite.scala | 133 +++++++++++++++++++
 3 files changed, 153 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/03e263f5/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 06e82f7..832ce78 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -119,7 +119,10 @@ class CheckpointWriter(
   private var stopped = false
   private var fs_ : FileSystem = _
 
-  class CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte]) 
extends Runnable {
+  class CheckpointWriteHandler(
+      checkpointTime: Time,
+      bytes: Array[Byte],
+      clearCheckpointDataLater: Boolean) extends Runnable {
     def run() {
       var attempts = 0
       val startTime = System.currentTimeMillis()
@@ -166,7 +169,7 @@ class CheckpointWriter(
           val finishTime = System.currentTimeMillis()
           logInfo("Checkpoint for time " + checkpointTime + " saved to file '" 
+ checkpointFile +
             "', took " + bytes.length + " bytes and " + (finishTime - 
startTime) + " ms")
-          jobGenerator.onCheckpointCompletion(checkpointTime)
+          jobGenerator.onCheckpointCompletion(checkpointTime, 
clearCheckpointDataLater)
           return
         } catch {
           case ioe: IOException =>
@@ -180,7 +183,7 @@ class CheckpointWriter(
     }
   }
 
-  def write(checkpoint: Checkpoint) {
+  def write(checkpoint: Checkpoint, clearCheckpointDataLater: Boolean) {
     val bos = new ByteArrayOutputStream()
     val zos = compressionCodec.compressedOutputStream(bos)
     val oos = new ObjectOutputStream(zos)
@@ -188,7 +191,8 @@ class CheckpointWriter(
     oos.close()
     bos.close()
     try {
-      executor.execute(new CheckpointWriteHandler(checkpoint.checkpointTime, 
bos.toByteArray))
+      executor.execute(new CheckpointWriteHandler(
+        checkpoint.checkpointTime, bos.toByteArray, clearCheckpointDataLater))
       logDebug("Submitted checkpoint of time " + checkpoint.checkpointTime + " 
writer queue")
     } catch {
       case rej: RejectedExecutionException =>

http://git-wip-us.apache.org/repos/asf/spark/blob/03e263f5/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index ac92774..59488df 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -30,7 +30,8 @@ import org.apache.spark.util.{Clock, ManualClock}
 private[scheduler] sealed trait JobGeneratorEvent
 private[scheduler] case class GenerateJobs(time: Time) extends 
JobGeneratorEvent
 private[scheduler] case class ClearMetadata(time: Time) extends 
JobGeneratorEvent
-private[scheduler] case class DoCheckpoint(time: Time) extends 
JobGeneratorEvent
+private[scheduler] case class DoCheckpoint(
+    time: Time, clearCheckpointDataLater: Boolean) extends JobGeneratorEvent
 private[scheduler] case class ClearCheckpointData(time: Time) extends 
JobGeneratorEvent
 
 /**
@@ -163,8 +164,10 @@ class JobGenerator(jobScheduler: JobScheduler) extends 
Logging {
   /**
    * Callback called when the checkpoint of a batch has been written.
    */
-  def onCheckpointCompletion(time: Time) {
-    eventActor ! ClearCheckpointData(time)
+  def onCheckpointCompletion(time: Time, clearCheckpointDataLater: Boolean) {
+    if (clearCheckpointDataLater) {
+      eventActor ! ClearCheckpointData(time)
+    }
   }
 
   /** Processes all events */
@@ -173,7 +176,8 @@ class JobGenerator(jobScheduler: JobScheduler) extends 
Logging {
     event match {
       case GenerateJobs(time) => generateJobs(time)
       case ClearMetadata(time) => clearMetadata(time)
-      case DoCheckpoint(time) => doCheckpoint(time)
+      case DoCheckpoint(time, clearCheckpointDataLater) =>
+        doCheckpoint(time, clearCheckpointDataLater)
       case ClearCheckpointData(time) => clearCheckpointData(time)
     }
   }
@@ -245,7 +249,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends 
Logging {
       case Failure(e) =>
         jobScheduler.reportError("Error generating jobs for time " + time, e)
     }
-    eventActor ! DoCheckpoint(time)
+    eventActor ! DoCheckpoint(time, clearCheckpointDataLater = false)
   }
 
   /** Clear DStream metadata for the given `time`. */
@@ -255,7 +259,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends 
Logging {
     // If checkpointing is enabled, then checkpoint,
     // else mark batch to be fully processed
     if (shouldCheckpoint) {
-      eventActor ! DoCheckpoint(time)
+      eventActor ! DoCheckpoint(time, clearCheckpointDataLater = true)
     } else {
       // If checkpointing is not enabled, then delete metadata information 
about
       // received blocks (block data not saved in any case). Otherwise, wait 
for
@@ -278,11 +282,11 @@ class JobGenerator(jobScheduler: JobScheduler) extends 
Logging {
   }
 
   /** Perform checkpoint for the give `time`. */
-  private def doCheckpoint(time: Time) {
+  private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) {
     if (shouldCheckpoint && (time - 
graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
       logInfo("Checkpointing graph for time " + time)
       ssc.graph.updateCheckpointData(time)
-      checkpointWriter.write(new Checkpoint(ssc, time))
+      checkpointWriter.write(new Checkpoint(ssc, time), 
clearCheckpointDataLater)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/03e263f5/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala
new file mode 100644
index 0000000..4150b60
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.scheduler
+
+import java.util.concurrent.CountDownLatch
+
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import org.scalatest.concurrent.Eventually._
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming._
+import org.apache.spark.util.{ManualClock, Utils}
+
+class JobGeneratorSuite extends TestSuiteBase {
+
+  // SPARK-6222 is a tricky regression bug which causes received block metadata
+  // to be deleted before the corresponding batch has completed. This occurs 
when
+  // the following conditions are met.
+  // 1. streaming checkpointing is enabled by setting 
streamingContext.checkpoint(dir)
+  // 2. input data is received through a receiver as blocks
+  // 3. a batch processing a set of blocks takes a long time, such that a few 
subsequent
+  //    batches have been generated and submitted for processing.
+  //
+  // The JobGenerator (as of Mar 16, 2015) checkpoints twice per batch, once 
after generation
+  // of a batch, and another time after the completion of a batch. The cleanup 
of
+  // checkpoint data (including block metadata, etc.) from DStream must be 
done only after the
+  // 2nd checkpoint has completed, that is, after the batch has been 
completely processed.
+  // However, the issue is that the checkpoint data and along with it received 
block data is
+  // cleaned even in the case of the 1st checkpoint, causing pre-mature 
deletion of received block
+  // data. For example, if the 3rd batch is still being process, the 7th batch 
may get generated,
+  // and the corresponding "1st checkpoint" will delete received block 
metadata of batch older
+  // than 6th batch. That, is 3rd batch's block metadata gets deleted even 
before 3rd batch has
+  // been completely processed.
+  //
+  // This test tries to create that scenario by the following.
+  // 1. enable checkpointing
+  // 2. generate batches with received blocks
+  // 3. make the 3rd batch never complete
+  // 4. allow subsequent batches to be generated (to allow premature deletion 
of 3rd batch metadata)
+  // 5. verify whether 3rd batch's block metadata still exists
+  //
+  test("SPARK-6222: Do not clear received block data too soon") {
+    import JobGeneratorSuite._
+    val checkpointDir = Utils.createTempDir()
+    val testConf = conf
+    testConf.set("spark.streaming.clock", 
"org.apache.spark.streaming.util.ManualClock")
+    testConf.set("spark.streaming.receiver.writeAheadLog.rollingInterval", "1")
+
+    withStreamingContext(new StreamingContext(testConf, batchDuration)) { ssc 
=>
+      val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+      val numBatches = 10
+      val longBatchNumber = 3 // 3rd batch will take a long time
+      val longBatchTime = longBatchNumber * batchDuration.milliseconds
+
+      val testTimeout = timeout(10 seconds)
+      val inputStream = ssc.receiverStream(new TestReceiver)
+
+      inputStream.foreachRDD((rdd: RDD[Int], time: Time) => {
+        if (time.milliseconds == longBatchTime) {
+          while (waitLatch.getCount() > 0) {
+            waitLatch.await()
+            println("Await over")
+          }
+        }
+      })
+
+      val batchCounter = new BatchCounter(ssc)
+      ssc.checkpoint(checkpointDir.getAbsolutePath)
+      ssc.start()
+
+      // Make sure the only 1 batch of information is to be remembered
+      assert(inputStream.rememberDuration === batchDuration)
+      val receiverTracker = ssc.scheduler.receiverTracker
+
+      // Get the blocks belonging to a batch
+      def getBlocksOfBatch(batchTime: Long) = {
+        receiverTracker.getBlocksOfBatchAndStream(Time(batchTime), 
inputStream.id)
+      }
+
+      // Wait for new blocks to be received
+      def waitForNewReceivedBlocks() {
+        eventually(testTimeout) {
+          assert(receiverTracker.hasUnallocatedBlocks)
+        }
+      }
+
+      // Wait for received blocks to be allocated to a batch
+      def waitForBlocksToBeAllocatedToBatch(batchTime: Long) {
+        eventually(testTimeout) {
+          assert(getBlocksOfBatch(batchTime).nonEmpty)
+        }
+      }
+
+      // Generate a large number of batches with blocks in them
+      for (batchNum <- 1 to numBatches) {
+        waitForNewReceivedBlocks()
+        clock.advance(batchDuration.milliseconds)
+        waitForBlocksToBeAllocatedToBatch(clock.getTimeMillis())
+      }
+
+      // Wait for 3rd batch to start
+      eventually(testTimeout) {
+        ssc.scheduler.getPendingTimes().contains(Time(numBatches * 
batchDuration.milliseconds))
+      }
+
+      // Verify that the 3rd batch's block data is still present while the 3rd 
batch is incomplete
+      assert(getBlocksOfBatch(longBatchTime).nonEmpty, "blocks of incomplete 
batch already deleted")
+      assert(batchCounter.getNumCompletedBatches < longBatchNumber)
+      waitLatch.countDown()
+    }
+  }
+}
+
+object JobGeneratorSuite {
+  val waitLatch = new CountDownLatch(1)
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to