spark git commit: [SPARK-6222][Streaming] Dont delete checkpoint data when doing pre-batch-start checkpoint
Repository: spark Updated Branches: refs/heads/master 540b2a4ea - 645cf3fcc [SPARK-6222][Streaming] Dont delete checkpoint data when doing pre-batch-start checkpoint This is another alternative approach to https://github.com/apache/spark/pull/4964/ I think this is a simpler fix that can be backported easily to other branches (1.2 and 1.3). All it does it introduce a flag so that the pre-batch-start checkpoint does not call clear checkpoint. There is not unit test yet. I will add it when this approach is commented upon. Not sure if this is testable easily. Author: Tathagata Das tathagata.das1...@gmail.com Closes #5008 from tdas/SPARK-6222 and squashes the following commits: 7315bc2 [Tathagata Das] Removed empty line. c438de4 [Tathagata Das] Revert unnecessary change. 5e98374 [Tathagata Das] Added unit test 50cb60b [Tathagata Das] Fixed style issue 295ca5c [Tathagata Das] Fixing SPARK-6222 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/645cf3fc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/645cf3fc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/645cf3fc Branch: refs/heads/master Commit: 645cf3fcc21987417b2946bdeeeb60af3edf667e Parents: 540b2a4 Author: Tathagata Das tathagata.das1...@gmail.com Authored: Thu Mar 19 02:15:50 2015 -0400 Committer: Tathagata Das tathagata.das1...@gmail.com Committed: Thu Mar 19 02:15:50 2015 -0400 -- .../org/apache/spark/streaming/Checkpoint.scala | 12 +- .../streaming/scheduler/JobGenerator.scala | 20 +-- .../streaming/scheduler/JobGeneratorSuite.scala | 133 +++ 3 files changed, 153 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/645cf3fc/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index cb4c94f..db64e11 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -119,7 +119,10 @@ class CheckpointWriter( private var stopped = false private var fs_ : FileSystem = _ - class CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte]) extends Runnable { + class CheckpointWriteHandler( + checkpointTime: Time, + bytes: Array[Byte], + clearCheckpointDataLater: Boolean) extends Runnable { def run() { var attempts = 0 val startTime = System.currentTimeMillis() @@ -166,7 +169,7 @@ class CheckpointWriter( val finishTime = System.currentTimeMillis() logInfo(Checkpoint for time + checkpointTime + saved to file ' + checkpointFile + ', took + bytes.length + bytes and + (finishTime - startTime) + ms) - jobGenerator.onCheckpointCompletion(checkpointTime) + jobGenerator.onCheckpointCompletion(checkpointTime, clearCheckpointDataLater) return } catch { case ioe: IOException = @@ -180,7 +183,7 @@ class CheckpointWriter( } } - def write(checkpoint: Checkpoint) { + def write(checkpoint: Checkpoint, clearCheckpointDataLater: Boolean) { val bos = new ByteArrayOutputStream() val zos = compressionCodec.compressedOutputStream(bos) val oos = new ObjectOutputStream(zos) @@ -188,7 +191,8 @@ class CheckpointWriter( oos.close() bos.close() try { - executor.execute(new CheckpointWriteHandler(checkpoint.checkpointTime, bos.toByteArray)) + executor.execute(new CheckpointWriteHandler( +checkpoint.checkpointTime, bos.toByteArray, clearCheckpointDataLater)) logDebug(Submitted checkpoint of time + checkpoint.checkpointTime + writer queue) } catch { case rej: RejectedExecutionException = http://git-wip-us.apache.org/repos/asf/spark/blob/645cf3fc/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index ac92774..59488df 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -30,7 +30,8 @@ import org.apache.spark.util.{Clock, ManualClock} private[scheduler] sealed trait JobGeneratorEvent private[scheduler] case class GenerateJobs(time: Time) extends JobGeneratorEvent private[scheduler] case class ClearMetadata(time: Time) extends
spark git commit: [SPARK-6222][Streaming] Dont delete checkpoint data when doing pre-batch-start checkpoint
Repository: spark Updated Branches: refs/heads/branch-1.3 1723f0591 - 03e263f5b [SPARK-6222][Streaming] Dont delete checkpoint data when doing pre-batch-start checkpoint This is another alternative approach to https://github.com/apache/spark/pull/4964/ I think this is a simpler fix that can be backported easily to other branches (1.2 and 1.3). All it does it introduce a flag so that the pre-batch-start checkpoint does not call clear checkpoint. There is not unit test yet. I will add it when this approach is commented upon. Not sure if this is testable easily. Author: Tathagata Das tathagata.das1...@gmail.com Closes #5008 from tdas/SPARK-6222 and squashes the following commits: 7315bc2 [Tathagata Das] Removed empty line. c438de4 [Tathagata Das] Revert unnecessary change. 5e98374 [Tathagata Das] Added unit test 50cb60b [Tathagata Das] Fixed style issue 295ca5c [Tathagata Das] Fixing SPARK-6222 (cherry picked from commit 645cf3fcc21987417b2946bdeeeb60af3edf667e) Signed-off-by: Tathagata Das tathagata.das1...@gmail.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/03e263f5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/03e263f5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/03e263f5 Branch: refs/heads/branch-1.3 Commit: 03e263f5b527cf574f4ffcd5cd886f7723e3756e Parents: 1723f05 Author: Tathagata Das tathagata.das1...@gmail.com Authored: Thu Mar 19 02:15:50 2015 -0400 Committer: Tathagata Das tathagata.das1...@gmail.com Committed: Thu Mar 19 02:16:07 2015 -0400 -- .../org/apache/spark/streaming/Checkpoint.scala | 12 +- .../streaming/scheduler/JobGenerator.scala | 20 +-- .../streaming/scheduler/JobGeneratorSuite.scala | 133 +++ 3 files changed, 153 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/03e263f5/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 06e82f7..832ce78 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -119,7 +119,10 @@ class CheckpointWriter( private var stopped = false private var fs_ : FileSystem = _ - class CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte]) extends Runnable { + class CheckpointWriteHandler( + checkpointTime: Time, + bytes: Array[Byte], + clearCheckpointDataLater: Boolean) extends Runnable { def run() { var attempts = 0 val startTime = System.currentTimeMillis() @@ -166,7 +169,7 @@ class CheckpointWriter( val finishTime = System.currentTimeMillis() logInfo(Checkpoint for time + checkpointTime + saved to file ' + checkpointFile + ', took + bytes.length + bytes and + (finishTime - startTime) + ms) - jobGenerator.onCheckpointCompletion(checkpointTime) + jobGenerator.onCheckpointCompletion(checkpointTime, clearCheckpointDataLater) return } catch { case ioe: IOException = @@ -180,7 +183,7 @@ class CheckpointWriter( } } - def write(checkpoint: Checkpoint) { + def write(checkpoint: Checkpoint, clearCheckpointDataLater: Boolean) { val bos = new ByteArrayOutputStream() val zos = compressionCodec.compressedOutputStream(bos) val oos = new ObjectOutputStream(zos) @@ -188,7 +191,8 @@ class CheckpointWriter( oos.close() bos.close() try { - executor.execute(new CheckpointWriteHandler(checkpoint.checkpointTime, bos.toByteArray)) + executor.execute(new CheckpointWriteHandler( +checkpoint.checkpointTime, bos.toByteArray, clearCheckpointDataLater)) logDebug(Submitted checkpoint of time + checkpoint.checkpointTime + writer queue) } catch { case rej: RejectedExecutionException = http://git-wip-us.apache.org/repos/asf/spark/blob/03e263f5/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index ac92774..59488df 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -30,7 +30,8 @@ import org.apache.spark.util.{Clock, ManualClock} private[scheduler] sealed trait JobGeneratorEvent private[scheduler] case