spark git commit: [SPARK-6222][Streaming] Dont delete checkpoint data when doing pre-batch-start checkpoint

2015-03-19 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master 540b2a4ea - 645cf3fcc


[SPARK-6222][Streaming] Dont delete checkpoint data when doing pre-batch-start 
checkpoint

This is another alternative approach to 
https://github.com/apache/spark/pull/4964/
I think this is a simpler fix that can be backported easily to other branches 
(1.2 and 1.3).

All it does it introduce a flag so that the pre-batch-start checkpoint does not 
call clear checkpoint.

There is not unit test yet. I will add it when this approach is commented upon. 
Not sure if this is testable easily.

Author: Tathagata Das tathagata.das1...@gmail.com

Closes #5008 from tdas/SPARK-6222 and squashes the following commits:

7315bc2 [Tathagata Das] Removed empty line.
c438de4 [Tathagata Das] Revert unnecessary change.
5e98374 [Tathagata Das] Added unit test
50cb60b [Tathagata Das] Fixed style issue
295ca5c [Tathagata Das] Fixing SPARK-6222


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/645cf3fc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/645cf3fc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/645cf3fc

Branch: refs/heads/master
Commit: 645cf3fcc21987417b2946bdeeeb60af3edf667e
Parents: 540b2a4
Author: Tathagata Das tathagata.das1...@gmail.com
Authored: Thu Mar 19 02:15:50 2015 -0400
Committer: Tathagata Das tathagata.das1...@gmail.com
Committed: Thu Mar 19 02:15:50 2015 -0400

--
 .../org/apache/spark/streaming/Checkpoint.scala |  12 +-
 .../streaming/scheduler/JobGenerator.scala  |  20 +--
 .../streaming/scheduler/JobGeneratorSuite.scala | 133 +++
 3 files changed, 153 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/645cf3fc/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index cb4c94f..db64e11 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -119,7 +119,10 @@ class CheckpointWriter(
   private var stopped = false
   private var fs_ : FileSystem = _
 
-  class CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte]) 
extends Runnable {
+  class CheckpointWriteHandler(
+  checkpointTime: Time,
+  bytes: Array[Byte],
+  clearCheckpointDataLater: Boolean) extends Runnable {
 def run() {
   var attempts = 0
   val startTime = System.currentTimeMillis()
@@ -166,7 +169,7 @@ class CheckpointWriter(
   val finishTime = System.currentTimeMillis()
   logInfo(Checkpoint for time  + checkpointTime +  saved to file ' 
+ checkpointFile +
 ', took  + bytes.length +  bytes and  + (finishTime - 
startTime) +  ms)
-  jobGenerator.onCheckpointCompletion(checkpointTime)
+  jobGenerator.onCheckpointCompletion(checkpointTime, 
clearCheckpointDataLater)
   return
 } catch {
   case ioe: IOException =
@@ -180,7 +183,7 @@ class CheckpointWriter(
 }
   }
 
-  def write(checkpoint: Checkpoint) {
+  def write(checkpoint: Checkpoint, clearCheckpointDataLater: Boolean) {
 val bos = new ByteArrayOutputStream()
 val zos = compressionCodec.compressedOutputStream(bos)
 val oos = new ObjectOutputStream(zos)
@@ -188,7 +191,8 @@ class CheckpointWriter(
 oos.close()
 bos.close()
 try {
-  executor.execute(new CheckpointWriteHandler(checkpoint.checkpointTime, 
bos.toByteArray))
+  executor.execute(new CheckpointWriteHandler(
+checkpoint.checkpointTime, bos.toByteArray, clearCheckpointDataLater))
   logDebug(Submitted checkpoint of time  + checkpoint.checkpointTime +  
writer queue)
 } catch {
   case rej: RejectedExecutionException =

http://git-wip-us.apache.org/repos/asf/spark/blob/645cf3fc/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index ac92774..59488df 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -30,7 +30,8 @@ import org.apache.spark.util.{Clock, ManualClock}
 private[scheduler] sealed trait JobGeneratorEvent
 private[scheduler] case class GenerateJobs(time: Time) extends 
JobGeneratorEvent
 private[scheduler] case class ClearMetadata(time: Time) extends 

spark git commit: [SPARK-6222][Streaming] Dont delete checkpoint data when doing pre-batch-start checkpoint

2015-03-19 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 1723f0591 - 03e263f5b


[SPARK-6222][Streaming] Dont delete checkpoint data when doing pre-batch-start 
checkpoint

This is another alternative approach to 
https://github.com/apache/spark/pull/4964/
I think this is a simpler fix that can be backported easily to other branches 
(1.2 and 1.3).

All it does it introduce a flag so that the pre-batch-start checkpoint does not 
call clear checkpoint.

There is not unit test yet. I will add it when this approach is commented upon. 
Not sure if this is testable easily.

Author: Tathagata Das tathagata.das1...@gmail.com

Closes #5008 from tdas/SPARK-6222 and squashes the following commits:

7315bc2 [Tathagata Das] Removed empty line.
c438de4 [Tathagata Das] Revert unnecessary change.
5e98374 [Tathagata Das] Added unit test
50cb60b [Tathagata Das] Fixed style issue
295ca5c [Tathagata Das] Fixing SPARK-6222

(cherry picked from commit 645cf3fcc21987417b2946bdeeeb60af3edf667e)
Signed-off-by: Tathagata Das tathagata.das1...@gmail.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/03e263f5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/03e263f5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/03e263f5

Branch: refs/heads/branch-1.3
Commit: 03e263f5b527cf574f4ffcd5cd886f7723e3756e
Parents: 1723f05
Author: Tathagata Das tathagata.das1...@gmail.com
Authored: Thu Mar 19 02:15:50 2015 -0400
Committer: Tathagata Das tathagata.das1...@gmail.com
Committed: Thu Mar 19 02:16:07 2015 -0400

--
 .../org/apache/spark/streaming/Checkpoint.scala |  12 +-
 .../streaming/scheduler/JobGenerator.scala  |  20 +--
 .../streaming/scheduler/JobGeneratorSuite.scala | 133 +++
 3 files changed, 153 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/03e263f5/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 06e82f7..832ce78 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -119,7 +119,10 @@ class CheckpointWriter(
   private var stopped = false
   private var fs_ : FileSystem = _
 
-  class CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte]) 
extends Runnable {
+  class CheckpointWriteHandler(
+  checkpointTime: Time,
+  bytes: Array[Byte],
+  clearCheckpointDataLater: Boolean) extends Runnable {
 def run() {
   var attempts = 0
   val startTime = System.currentTimeMillis()
@@ -166,7 +169,7 @@ class CheckpointWriter(
   val finishTime = System.currentTimeMillis()
   logInfo(Checkpoint for time  + checkpointTime +  saved to file ' 
+ checkpointFile +
 ', took  + bytes.length +  bytes and  + (finishTime - 
startTime) +  ms)
-  jobGenerator.onCheckpointCompletion(checkpointTime)
+  jobGenerator.onCheckpointCompletion(checkpointTime, 
clearCheckpointDataLater)
   return
 } catch {
   case ioe: IOException =
@@ -180,7 +183,7 @@ class CheckpointWriter(
 }
   }
 
-  def write(checkpoint: Checkpoint) {
+  def write(checkpoint: Checkpoint, clearCheckpointDataLater: Boolean) {
 val bos = new ByteArrayOutputStream()
 val zos = compressionCodec.compressedOutputStream(bos)
 val oos = new ObjectOutputStream(zos)
@@ -188,7 +191,8 @@ class CheckpointWriter(
 oos.close()
 bos.close()
 try {
-  executor.execute(new CheckpointWriteHandler(checkpoint.checkpointTime, 
bos.toByteArray))
+  executor.execute(new CheckpointWriteHandler(
+checkpoint.checkpointTime, bos.toByteArray, clearCheckpointDataLater))
   logDebug(Submitted checkpoint of time  + checkpoint.checkpointTime +  
writer queue)
 } catch {
   case rej: RejectedExecutionException =

http://git-wip-us.apache.org/repos/asf/spark/blob/03e263f5/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index ac92774..59488df 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -30,7 +30,8 @@ import org.apache.spark.util.{Clock, ManualClock}
 private[scheduler] sealed trait JobGeneratorEvent
 private[scheduler] case