Samza-1364: Handle ZKExceptions in zkCoordinationUtils.reset. In some cases LocalAppRunner.waitForFinish indefinitely blocks after LocalApplicationRunner.kill. Last step in LocalAppRunner.kill(streamApp) is zkClient.close()[zkClient belongs to ZkCoordinationService].
ApplicationRunner.kill triggers listeners chain and in final listener zkClient.close throws ZkInterruptedException(RuntimeException) & it's swallowed in listeners preventing shutdownLatch update in LocalApplicationRunner(required for proper shutdown). Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com> Reviewers: Boris Shkolnik <bor...@apache.org>, Navina Ramesh <nav...@apache.org> Closes #246 from shanthoosh/SAMZA-1364 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/d32e8bb3 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/d32e8bb3 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/d32e8bb3 Branch: refs/heads/0.14.0 Commit: d32e8bb3a64db5ffdcded3b299e10755e8679e0f Parents: 95d96b9 Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com> Authored: Tue Jul 25 17:29:08 2017 -0700 Committer: navina <nav...@apache.org> Committed: Tue Jul 25 17:29:08 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/samza/zk/ZkCoordinationUtils.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/d32e8bb3/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java index df0a527..f5dda2e 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java @@ -18,13 +18,18 @@ */ package org.apache.samza.zk; +import org.I0Itec.zkclient.exception.ZkInterruptedException; import org.apache.samza.config.ZkConfig; import org.apache.samza.coordinator.CoordinationUtils; import org.apache.samza.coordinator.Latch; import org.apache.samza.coordinator.LeaderElector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ZkCoordinationUtils implements CoordinationUtils { + private static final Logger LOG = LoggerFactory.getLogger(ZkCoordinationUtils.class); + public final ZkConfig zkConfig; public final ZkUtils zkUtils; public final String processorIdStr; @@ -37,7 +42,12 @@ public class ZkCoordinationUtils implements CoordinationUtils { @Override public void reset() { - zkUtils.close(); + try { + zkUtils.close(); + } catch (ZkInterruptedException ex) { + // Swallowing due to occurrence in the last stage of lifecycle(Not actionable). + LOG.error("Exception in reset: ", ex); + } } @Override