[FLINK-3107] [runtime] Start checkpoint ID counter with periodic scheduler Problem: The job manager enables checkpoints during submission of streaming programs. This can lead to call to a call to `ZooKeeperCheckpointIDCounter.start()`, which communicates with ZooKeeper. This can block the job manager actor.
Solution: Start the counter in the `CheckpointCoordinatorDeActivator`. This closes #1610. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8df0bbac Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8df0bbac Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8df0bbac Branch: refs/heads/tableOnCalcite Commit: 8df0bbacb8712342471c12cbf765a0a92b70abc9 Parents: 6968a57 Author: Ufuk Celebi <u...@apache.org> Authored: Tue Feb 9 16:06:46 2016 +0100 Committer: Ufuk Celebi <u...@apache.org> Committed: Wed Feb 10 19:51:59 2016 +0100 ---------------------------------------------------------------------- .../flink/runtime/checkpoint/CheckpointCoordinator.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8df0bbac/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 9963a20..b0e23d6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -197,9 +197,9 @@ public class CheckpointCoordinator { this.completedCheckpointStore = checkNotNull(completedCheckpointStore); this.recentPendingCheckpoints = new ArrayDeque<Long>(NUM_GHOST_CHECKPOINT_IDS); this.userClassLoader = userClassLoader; - this.checkpointIdCounter = checkNotNull(checkpointIDCounter); - checkpointIDCounter.start(); + // Started with the periodic scheduler + this.checkpointIdCounter = checkNotNull(checkpointIDCounter); this.timer = new Timer("Checkpoint Timer", true); @@ -862,6 +862,14 @@ public class CheckpointCoordinator { // make sure all prior timers are cancelled stopCheckpointScheduler(); + try { + // Multiple start calls are OK + checkpointIdCounter.start(); + } catch (Exception e) { + String msg = "Failed to start checkpoint ID counter: " + e.getMessage(); + throw new RuntimeException(msg, e); + } + periodicScheduling = true; currentPeriodicTrigger = new ScheduledTrigger(); timer.scheduleAtFixedRate(currentPeriodicTrigger, baseInterval, baseInterval);