[ https://issues.apache.org/jira/browse/KAFKA-873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Scott Clasen updated KAFKA-873: ------------------------------- Status: Patch Available (was: Open) --- core/build.sbt | 2 ++ core/src/main/scala/kafka/common/KafkaZookeperClient.scala | 12 ++++++++++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/core/build.sbt b/core/build.sbt index 405ea55..9e01605 100644 --- a/core/build.sbt +++ b/core/build.sbt @@ -13,6 +13,8 @@ libraryDependencies <+= scalaVersion("org.scala-lang" % "scala-compiler" % _ ) libraryDependencies ++= Seq( "org.apache.zookeeper" % "zookeeper" % "3.3.4", "com.101tec" % "zkclient" % "0.2", + "com.netflix.curator" % "curator-framework" % "1.3.3" exclude("org.apache.zookeeper", "zookeeper"), + "com.netflix.curator" % "curator-x-zkclient-bridge" % "1.3.3" exclude("org.apache.zookeeper", "zookeeper"), "org.xerial.snappy" % "snappy-java" % "1.0.4.1", "com.yammer.metrics" % "metrics-core" % "2.2.0", "com.yammer.metrics" % "metrics-annotation" % "2.2.0", diff --git a/core/src/main/scala/kafka/common/KafkaZookeperClient.scala b/core/src/main/scala/kafka/common/KafkaZookeperClient.scala index bace1d2..aeae5c9 100644 --- a/core/src/main/scala/kafka/common/KafkaZookeperClient.scala +++ b/core/src/main/scala/kafka/common/KafkaZookeperClient.scala @@ -20,6 +20,9 @@ package kafka.common import org.I0Itec.zkclient.ZkClient import kafka.utils.{ZKStringSerializer, ZKConfig} import java.util.concurrent.atomic.AtomicReference +import com.netflix.curator.x.zkclientbridge.CuratorZKClientBridge +import com.netflix.curator.framework.CuratorFrameworkFactory +import com.netflix.curator.{RetrySleeper, RetryPolicy} object KafkaZookeeperClient { private val INSTANCE = new AtomicReference[ZkClient](null) @@ -28,8 +31,13 @@ object KafkaZookeeperClient { // TODO: This cannot be a singleton since unit tests break if we do that // INSTANCE.compareAndSet(null, new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, // ZKStringSerializer)) - INSTANCE.set(new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, - ZKStringSerializer)) + val policy = new RetryPolicy { + def allowRetry(retryCount: Int, elapsedTimeMs: Long, sleeper: RetrySleeper): Boolean = false + } + val curator = CuratorFrameworkFactory.newClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, policy) + val bridge = new CuratorZKClientBridge(curator) + val client = new ZkClient(bridge,config.zkConnectionTimeoutMs, ZKStringSerializer) + INSTANCE.set(client) INSTANCE.get() } } -- 1.8.0.1 > Consider replacing zkclient with curator (with zkclient-bridge) > --------------------------------------------------------------- > > Key: KAFKA-873 > URL: https://issues.apache.org/jira/browse/KAFKA-873 > Project: Kafka > Issue Type: Improvement > Affects Versions: 0.8 > Reporter: Scott Clasen > > If zkclient was replaced with curator and curator-x-zkclient-bridge it would > be initially a drop-in replacement > https://github.com/Netflix/curator/wiki/ZKClient-Bridge > With the addition of a few more props to ZkConfig, and a bit of code this > would open up the possibility of using ACLs in zookeeper (which arent > supported directly by zkclient), as well as integrating with netflix > exhibitor for those of us using that. > Looks like KafkaZookeeperClient needs some love anyhow... -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira