Repository: spark Updated Branches: refs/heads/master e1bd70f44 -> 2c72a4432
[SPARK-16487][STREAMING] Fix some batches might not get marked as fully processed in JobGenerator ## What changes were proposed in this pull request? In `JobGenerator`, the code reads like that some batches might not get marked as fully processed. In the following flowchart, the batch should get marked fully processed before endpoint C however it is not. Currently, this does not actually cause an issue, as the condition `(time - zeroTime) is multiple of checkpoint duration?` always evaluates to `true` as the `checkpoint duration` is always set to be equal to the `batch duration`. ![Flowchart](https://s31.postimg.org/udy9lti2j/spark_streaming_job_generator.png) This PR fixes this issue so as to improve code readability and to avoid any potential issue in case there is any future change making checkpoint duration to be set different from batch duration. Author: Ahmed Mahran <ahmed.mah...@mashin.io> Closes #14145 from ahmed-mahran/b-mark-batch-fully-processed. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2c72a443 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2c72a443 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2c72a443 Branch: refs/heads/master Commit: 2c72a4432b335f44a95feb340cebfd29488d1eb1 Parents: e1bd70f Author: Ahmed Mahran <ahmed.mah...@mashin.io> Authored: Fri Jul 22 12:39:12 2016 +0100 Committer: Sean Owen <so...@cloudera.com> Committed: Fri Jul 22 12:39:12 2016 +0100 ---------------------------------------------------------------------- .../org/apache/spark/streaming/scheduler/JobGenerator.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/2c72a443/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 19c88f1..10d64f9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -287,12 +287,14 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { markBatchFullyProcessed(time) } - /** Perform checkpoint for the give `time`. */ + /** Perform checkpoint for the given `time`. */ private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) { if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) { logInfo("Checkpointing graph for time " + time) ssc.graph.updateCheckpointData(time) checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater) + } else if (clearCheckpointDataLater) { + markBatchFullyProcessed(time) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org