Repository: samza Updated Branches: refs/heads/master 4eb515313 -> 1c1139399
SAMZA-1368; make sure new job model will be generated in case of barrier timeout. Author: Boris Shkolnik <bor...@apache.org> Reviewers: Shanthoosh V <svenk...@linkedin.com> Closes #247 from sborya/onBarrierTimeout1 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1c113939 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1c113939 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1c113939 Branch: refs/heads/master Commit: 1c1139399599c1cb31249e8d7a28291e2ad9d27e Parents: 4eb5153 Author: Boris Shkolnik <bor...@apache.org> Authored: Fri Jul 21 15:32:58 2017 -0700 Committer: Jagadish <jagad...@apache.org> Committed: Fri Jul 21 15:32:58 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/samza/zk/ZkJobCoordinator.java | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/1c113939/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java index dd08e3f..e973099 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java @@ -313,11 +313,17 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { debounceTimer.scheduleAfterDebounceTime(barrierAction, 0, () -> onNewJobModelConfirmed(version)); } else { if (ZkBarrierForVersionUpgrade.State.TIMED_OUT.equals(state)) { - // no-op - // In our consensus model, if the Barrier is timed-out, then it means that one or more initial - // participants failed to join. That means, they should have de-registered from "processors" list - // and that would have triggered onProcessorChange action -> a new round of consensus. - LOG.info("Barrier for version " + version + " timed out."); + // no-op for non-leaders + // for leader: make sure we do not stop - so generate a new job model + LOG.warn("Barrier for version " + version + " timed out."); + if (zkController.isLeader()) { + LOG.info("Leader will schedule a new job model generation"); + debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE, debounceTimeMs, () -> + { + // actual actions to do are the same as onProcessorChange + doOnProcessorChange(new ArrayList<>()); + }); + } } } }