[ 
https://issues.apache.org/jira/browse/KAFKA-873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Scott Clasen updated KAFKA-873:
-------------------------------

    Status: Patch Available  (was: Open)

---
 core/build.sbt                                             |  2 ++
 core/src/main/scala/kafka/common/KafkaZookeperClient.scala | 12 ++++++++++--
 2 files changed, 12 insertions(+), 2 deletions(-)

diff --git a/core/build.sbt b/core/build.sbt
index 405ea55..9e01605 100644
--- a/core/build.sbt
+++ b/core/build.sbt
@@ -13,6 +13,8 @@ libraryDependencies <+= scalaVersion("org.scala-lang" % 
"scala-compiler" % _ )
 libraryDependencies ++= Seq(
   "org.apache.zookeeper"  % "zookeeper"   % "3.3.4",
   "com.101tec"            % "zkclient"     % "0.2",
+  "com.netflix.curator"   % "curator-framework"     % "1.3.3" 
exclude("org.apache.zookeeper", "zookeeper"),
+  "com.netflix.curator"   % "curator-x-zkclient-bridge"  % "1.3.3" 
exclude("org.apache.zookeeper", "zookeeper"),
   "org.xerial.snappy"     % "snappy-java" % "1.0.4.1",
   "com.yammer.metrics"    % "metrics-core" % "2.2.0",
   "com.yammer.metrics"    % "metrics-annotation" % "2.2.0",
diff --git a/core/src/main/scala/kafka/common/KafkaZookeperClient.scala 
b/core/src/main/scala/kafka/common/KafkaZookeperClient.scala
index bace1d2..aeae5c9 100644
--- a/core/src/main/scala/kafka/common/KafkaZookeperClient.scala
+++ b/core/src/main/scala/kafka/common/KafkaZookeperClient.scala
@@ -20,6 +20,9 @@ package kafka.common
 import org.I0Itec.zkclient.ZkClient
 import kafka.utils.{ZKStringSerializer, ZKConfig}
 import java.util.concurrent.atomic.AtomicReference
+import com.netflix.curator.x.zkclientbridge.CuratorZKClientBridge
+import com.netflix.curator.framework.CuratorFrameworkFactory
+import com.netflix.curator.{RetrySleeper, RetryPolicy}

 object KafkaZookeeperClient {
   private val INSTANCE = new AtomicReference[ZkClient](null)
@@ -28,8 +31,13 @@ object KafkaZookeeperClient {
     // TODO: This cannot be a singleton since unit tests break if we do that
 //    INSTANCE.compareAndSet(null, new ZkClient(config.zkConnect, 
config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
 //                                              ZKStringSerializer))
-    INSTANCE.set(new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, 
config.zkConnectionTimeoutMs,
-                                              ZKStringSerializer))
+    val policy = new RetryPolicy {
+      def allowRetry(retryCount: Int, elapsedTimeMs: Long, sleeper: 
RetrySleeper): Boolean = false
+    }
+    val curator = CuratorFrameworkFactory.newClient(config.zkConnect, 
config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, policy)
+    val bridge = new CuratorZKClientBridge(curator)
+    val client = new ZkClient(bridge,config.zkConnectionTimeoutMs, 
ZKStringSerializer)
+    INSTANCE.set(client)
     INSTANCE.get()
   }
 }
--
1.8.0.1
                
> Consider replacing zkclient with curator (with zkclient-bridge)
> ---------------------------------------------------------------
>
>                 Key: KAFKA-873
>                 URL: https://issues.apache.org/jira/browse/KAFKA-873
>             Project: Kafka
>          Issue Type: Improvement
>    Affects Versions: 0.8
>            Reporter: Scott Clasen
>
> If zkclient was replaced with curator and curator-x-zkclient-bridge it would 
> be initially a drop-in replacement
> https://github.com/Netflix/curator/wiki/ZKClient-Bridge
> With the addition of a few more props to ZkConfig, and a bit of code this 
> would open up the possibility of using ACLs in zookeeper (which arent 
> supported directly by zkclient), as well as integrating with netflix 
> exhibitor for those of us using that.
> Looks like KafkaZookeeperClient needs some love anyhow...

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to