Repository: spark
Updated Branches:
  refs/heads/master 936bc0bcb -> 928d63162


[SPARK-11740][STREAMING] Fix the race condition of two checkpoints in a batch

We will do checkpoint when generating a batch and completing a batch. When the 
processing time of a batch is greater than the batch interval, checkpointing 
for completing an old batch may run after checkpointing for generating a new 
batch. If this happens, checkpoint of an old batch actually has the latest 
information, so we want to recovery from it. This PR will use the latest 
checkpoint time as the file name, so that we can always recovery from the 
latest checkpoint file.

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #9707 from zsxwing/fix-checkpoint.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/928d6316
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/928d6316
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/928d6316

Branch: refs/heads/master
Commit: 928d631625297857fb6998fbeb0696917fbfd60f
Parents: 936bc0b
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Tue Nov 17 14:48:29 2015 -0800
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Tue Nov 17 14:48:29 2015 -0800

----------------------------------------------------------------------
 python/pyspark/streaming/tests.py               |  9 +++----
 .../org/apache/spark/streaming/Checkpoint.scala | 18 +++++++++++--
 .../spark/streaming/CheckpointSuite.scala       | 27 ++++++++++++++++++--
 3 files changed, 45 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/928d6316/python/pyspark/streaming/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/tests.py 
b/python/pyspark/streaming/tests.py
index 2983028..ff95639 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -753,7 +753,6 @@ class CheckpointTests(unittest.TestCase):
         if self.cpd is not None:
             shutil.rmtree(self.cpd)
 
-    @unittest.skip("Enable it when we fix the checkpoint bug")
     def test_get_or_create_and_get_active_or_create(self):
         inputd = tempfile.mkdtemp()
         outputd = tempfile.mkdtemp() + "/"
@@ -822,11 +821,11 @@ class CheckpointTests(unittest.TestCase):
         # Verify that getOrCreate() uses existing SparkContext
         self.ssc.stop(True, True)
         time.sleep(1)
-        sc = SparkContext(SparkConf())
+        self.sc = SparkContext(conf=SparkConf())
         self.setupCalled = False
         self.ssc = StreamingContext.getOrCreate(self.cpd, setup)
         self.assertFalse(self.setupCalled)
-        self.assertTrue(self.ssc.sparkContext == sc)
+        self.assertTrue(self.ssc.sparkContext == self.sc)
 
         # Verify the getActiveOrCreate() recovers from checkpoint files
         self.ssc.stop(True, True)
@@ -845,11 +844,11 @@ class CheckpointTests(unittest.TestCase):
         # Verify that getActiveOrCreate() uses existing SparkContext
         self.ssc.stop(True, True)
         time.sleep(1)
-        self.sc = SparkContext(SparkConf())
+        self.sc = SparkContext(conf=SparkConf())
         self.setupCalled = False
         self.ssc = StreamingContext.getActiveOrCreate(self.cpd, setup)
         self.assertFalse(self.setupCalled)
-        self.assertTrue(self.ssc.sparkContext == sc)
+        self.assertTrue(self.ssc.sparkContext == self.sc)
 
         # Verify that getActiveOrCreate() calls setup() in absence of 
checkpoint files
         self.ssc.stop(True, True)

http://git-wip-us.apache.org/repos/asf/spark/blob/928d6316/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 0cd55d9..fd0e8d5 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -187,16 +187,30 @@ class CheckpointWriter(
   private var stopped = false
   private var fs_ : FileSystem = _
 
+  @volatile private var latestCheckpointTime: Time = null
+
   class CheckpointWriteHandler(
       checkpointTime: Time,
       bytes: Array[Byte],
       clearCheckpointDataLater: Boolean) extends Runnable {
     def run() {
+      if (latestCheckpointTime == null || latestCheckpointTime < 
checkpointTime) {
+        latestCheckpointTime = checkpointTime
+      }
       var attempts = 0
       val startTime = System.currentTimeMillis()
       val tempFile = new Path(checkpointDir, "temp")
-      val checkpointFile = Checkpoint.checkpointFile(checkpointDir, 
checkpointTime)
-      val backupFile = Checkpoint.checkpointBackupFile(checkpointDir, 
checkpointTime)
+      // We will do checkpoint when generating a batch and completing a batch. 
When the processing
+      // time of a batch is greater than the batch interval, checkpointing for 
completing an old
+      // batch may run after checkpointing of a new batch. If this happens, 
checkpoint of an old
+      // batch actually has the latest information, so we want to recovery 
from it. Therefore, we
+      // also use the latest checkpoint time as the file name, so that we can 
recovery from the
+      // latest checkpoint file.
+      //
+      // Note: there is only one thread writting the checkpoint files, so we 
don't need to worry
+      // about thread-safety.
+      val checkpointFile = Checkpoint.checkpointFile(checkpointDir, 
latestCheckpointTime)
+      val backupFile = Checkpoint.checkpointBackupFile(checkpointDir, 
latestCheckpointTime)
 
       while (attempts < MAX_ATTEMPTS && !stopped) {
         attempts += 1

http://git-wip-us.apache.org/repos/asf/spark/blob/928d6316/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 84f5294..b1cbc71 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.streaming
 
 import java.io.{ObjectOutputStream, ByteArrayOutputStream, 
ByteArrayInputStream, File}
-import org.apache.spark.TestUtils
 
 import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
 import scala.reflect.ClassTag
@@ -30,11 +29,13 @@ import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.io.{IntWritable, Text}
 import org.apache.hadoop.mapred.TextOutputFormat
 import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => 
NewTextOutputFormat}
+import org.mockito.Mockito.mock
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.time.SpanSugar._
 
+import org.apache.spark.TestUtils
 import org.apache.spark.streaming.dstream.{DStream, FileInputDStream}
-import org.apache.spark.streaming.scheduler.{ConstantEstimator, 
RateTestInputDStream, RateTestReceiver}
+import org.apache.spark.streaming.scheduler._
 import org.apache.spark.util.{MutableURLClassLoader, Clock, ManualClock, Utils}
 
 /**
@@ -611,6 +612,28 @@ class CheckpointSuite extends TestSuiteBase {
     assert(ois.readObject().asInstanceOf[Class[_]].getName == "[LtestClz;")
   }
 
+  test("SPARK-11267: the race condition of two checkpoints in a batch") {
+    val jobGenerator = mock(classOf[JobGenerator])
+    val checkpointDir = Utils.createTempDir().toString
+    val checkpointWriter =
+      new CheckpointWriter(jobGenerator, conf, checkpointDir, new 
Configuration())
+    val bytes1 = Array.fill[Byte](10)(1)
+    new checkpointWriter.CheckpointWriteHandler(
+      Time(2000), bytes1, clearCheckpointDataLater = false).run()
+    val bytes2 = Array.fill[Byte](10)(2)
+    new checkpointWriter.CheckpointWriteHandler(
+      Time(1000), bytes2, clearCheckpointDataLater = true).run()
+    val checkpointFiles = 
Checkpoint.getCheckpointFiles(checkpointDir).reverse.map { path =>
+      new File(path.toUri)
+    }
+    assert(checkpointFiles.size === 2)
+    // Although bytes2 was written with an old time, it contains the latest 
status, so we should
+    // try to read from it at first.
+    assert(Files.toByteArray(checkpointFiles(0)) === bytes2)
+    assert(Files.toByteArray(checkpointFiles(1)) === bytes1)
+    checkpointWriter.stop()
+  }
+
   /**
    * Tests a streaming operation under checkpointing, by restarting the 
operation
    * from checkpoint file and verifying whether the final output is correct.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to