Repository: spark
Updated Branches:
  refs/heads/master e1bd70f44 -> 2c72a4432


[SPARK-16487][STREAMING] Fix some batches might not get marked as fully 
processed in JobGenerator

## What changes were proposed in this pull request?

In `JobGenerator`, the code reads like that some batches might not get marked 
as fully processed. In the following flowchart, the batch should get marked 
fully processed before endpoint C however it is not. Currently, this does not 
actually cause an issue, as the condition `(time - zeroTime) is multiple of 
checkpoint duration?` always evaluates to `true` as the `checkpoint duration` 
is always set to be equal to the `batch duration`.

![Flowchart](https://s31.postimg.org/udy9lti2j/spark_streaming_job_generator.png)

This PR fixes this issue so as to improve code readability and to avoid any 
potential issue in case there is any future change making checkpoint duration 
to be set different from batch duration.

Author: Ahmed Mahran <ahmed.mah...@mashin.io>

Closes #14145 from ahmed-mahran/b-mark-batch-fully-processed.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2c72a443
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2c72a443
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2c72a443

Branch: refs/heads/master
Commit: 2c72a4432b335f44a95feb340cebfd29488d1eb1
Parents: e1bd70f
Author: Ahmed Mahran <ahmed.mah...@mashin.io>
Authored: Fri Jul 22 12:39:12 2016 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Fri Jul 22 12:39:12 2016 +0100

----------------------------------------------------------------------
 .../org/apache/spark/streaming/scheduler/JobGenerator.scala      | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2c72a443/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 19c88f1..10d64f9 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -287,12 +287,14 @@ class JobGenerator(jobScheduler: JobScheduler) extends 
Logging {
     markBatchFullyProcessed(time)
   }
 
-  /** Perform checkpoint for the give `time`. */
+  /** Perform checkpoint for the given `time`. */
   private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) {
     if (shouldCheckpoint && (time - 
graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
       logInfo("Checkpointing graph for time " + time)
       ssc.graph.updateCheckpointData(time)
       checkpointWriter.write(new Checkpoint(ssc, time), 
clearCheckpointDataLater)
+    } else if (clearCheckpointDataLater) {
+      markBatchFullyProcessed(time)
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to