Repository: samza Updated Branches: refs/heads/master e63d27c0c -> c89841366
SAMZA-1686: Set finite operation timeout when creating zkClient. Currently zkClient is created with operationRetryTimeOut of -1. This causes zkClient to retry indefinitely in case of irrecoverable exceptions thereby delaying the StreamProcessor shutdown. Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com> Reviewers: Jagadish Venkatraman <vjagadish1...@apache.org> Closes #487 from shanthoosh/master Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c8984136 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c8984136 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c8984136 Branch: refs/heads/master Commit: c89841366d53b9e2733dc72bbf91dea9382099a0 Parents: e63d27c Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com> Authored: Tue Apr 24 15:56:05 2018 -0700 Committer: Prateek Maheshwari <pmahe...@linkedin.com> Committed: Tue Apr 24 15:56:05 2018 -0700 ---------------------------------------------------------------------- .../main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/c8984136/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java index 8dd42c1..072a2f5 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java @@ -20,6 +20,7 @@ package org.apache.samza.zk; import com.google.common.base.Strings; import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.serialize.SerializableSerializer; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.ZkConfig; @@ -54,7 +55,7 @@ public class ZkCoordinationUtilsFactory implements CoordinationUtilsFactory { public static ZkClient createZkClient(String connectString, int sessionTimeoutMS, int connectionTimeoutMs) { ZkClient zkClient; try { - zkClient = new ZkClient(connectString, sessionTimeoutMS, connectionTimeoutMs); + zkClient = new ZkClient(connectString, sessionTimeoutMS, connectionTimeoutMs, new SerializableSerializer(), connectionTimeoutMs); } catch (Exception e) { // ZkClient constructor may throw a variety of different exceptions, not all of them Zk based. throw new SamzaException("zkClient failed to connect to ZK at :" + connectString, e);