[FLINK-3107] [runtime] Start checkpoint ID counter with periodic scheduler

Problem: The job manager enables checkpoints during submission of streaming
programs. This can lead to call to a call to 
`ZooKeeperCheckpointIDCounter.start()`,
which communicates with ZooKeeper. This can block the job manager actor.

Solution: Start the counter in the `CheckpointCoordinatorDeActivator`.

This closes #1610.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8df0bbac
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8df0bbac
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8df0bbac

Branch: refs/heads/tableOnCalcite
Commit: 8df0bbacb8712342471c12cbf765a0a92b70abc9
Parents: 6968a57
Author: Ufuk Celebi <u...@apache.org>
Authored: Tue Feb 9 16:06:46 2016 +0100
Committer: Ufuk Celebi <u...@apache.org>
Committed: Wed Feb 10 19:51:59 2016 +0100

----------------------------------------------------------------------
 .../flink/runtime/checkpoint/CheckpointCoordinator.java | 12 ++++++++++--
 1 file changed, 10 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8df0bbac/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 9963a20..b0e23d6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -197,9 +197,9 @@ public class CheckpointCoordinator {
                this.completedCheckpointStore = 
checkNotNull(completedCheckpointStore);
                this.recentPendingCheckpoints = new 
ArrayDeque<Long>(NUM_GHOST_CHECKPOINT_IDS);
                this.userClassLoader = userClassLoader;
-               this.checkpointIdCounter = checkNotNull(checkpointIDCounter);
 
-               checkpointIDCounter.start();
+               // Started with the periodic scheduler
+               this.checkpointIdCounter = checkNotNull(checkpointIDCounter);
 
                this.timer = new Timer("Checkpoint Timer", true);
 
@@ -862,6 +862,14 @@ public class CheckpointCoordinator {
                        // make sure all prior timers are cancelled
                        stopCheckpointScheduler();
 
+                       try {
+                               // Multiple start calls are OK
+                               checkpointIdCounter.start();
+                       } catch (Exception e) {
+                               String msg = "Failed to start checkpoint ID 
counter: " + e.getMessage();
+                               throw new RuntimeException(msg, e);
+                       }
+
                        periodicScheduling = true;
                        currentPeriodicTrigger = new ScheduledTrigger();
                        timer.scheduleAtFixedRate(currentPeriodicTrigger, 
baseInterval, baseInterval);

Reply via email to