Samza-1364: Handle ZKExceptions in zkCoordinationUtils.reset.

In some cases LocalAppRunner.waitForFinish indefinitely blocks after 
LocalApplicationRunner.kill. Last step in LocalAppRunner.kill(streamApp) is 
zkClient.close()[zkClient belongs to ZkCoordinationService].

ApplicationRunner.kill triggers listeners chain and in final listener 
zkClient.close throws ZkInterruptedException(RuntimeException) & it's swallowed 
in listeners preventing shutdownLatch update in LocalApplicationRunner(required 
for proper shutdown).

Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com>

Reviewers: Boris Shkolnik <bor...@apache.org>, Navina Ramesh <nav...@apache.org>

Closes #246 from shanthoosh/SAMZA-1364


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/d32e8bb3
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/d32e8bb3
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/d32e8bb3

Branch: refs/heads/0.14.0
Commit: d32e8bb3a64db5ffdcded3b299e10755e8679e0f
Parents: 95d96b9
Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com>
Authored: Tue Jul 25 17:29:08 2017 -0700
Committer: navina <nav...@apache.org>
Committed: Tue Jul 25 17:29:08 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/samza/zk/ZkCoordinationUtils.java   | 12 +++++++++++-
 1 file changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/d32e8bb3/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
index df0a527..f5dda2e 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
@@ -18,13 +18,18 @@
  */
 package org.apache.samza.zk;
 
+import org.I0Itec.zkclient.exception.ZkInterruptedException;
 import org.apache.samza.config.ZkConfig;
 import org.apache.samza.coordinator.CoordinationUtils;
 import org.apache.samza.coordinator.Latch;
 import org.apache.samza.coordinator.LeaderElector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public class ZkCoordinationUtils implements CoordinationUtils {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ZkCoordinationUtils.class);
+
   public final ZkConfig zkConfig;
   public final ZkUtils zkUtils;
   public final String processorIdStr;
@@ -37,7 +42,12 @@ public class ZkCoordinationUtils implements 
CoordinationUtils {
 
   @Override
   public void reset() {
-    zkUtils.close();
+    try {
+      zkUtils.close();
+    } catch (ZkInterruptedException ex) {
+      // Swallowing due to occurrence in the last stage of lifecycle(Not 
actionable).
+      LOG.error("Exception in reset: ", ex);
+    }
   }
 
   @Override

Reply via email to