Hi,

This may be the common zookeeper error in with Kafka-Storm integration.
When I run Kafka topology in the local cluster mode, it runs well.
In distributed mode, there are 2 supervisor and 1 nimbus. Zookeeper and
kafka are running on nimbus node. When I submit topology from nimbus node,
zookeeper gives following error in worker logs.


java.lang.RuntimeException: java.lang.RuntimeException:
org.apache.zookeeper.KeeperException$ConnectionLossException:
KeeperErrorCode = ConnectionLoss
        at 
storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:62)
        at storm.kafka.trident.ZkBrokerReader.(ZkBrokerReader.java:24)
        at storm.kafka.KafkaUtils.makeBrokerReader(KafkaUtils.java:35)
        at storm.kafka.KafkaSpout.open(KafkaSpout.java:70)
        at 
backtype.storm.daemon.executor$eval3848$fn__3849$fn__3864.invoke(executor.clj:519)
        at backtype.storm.util$async_loop$fn__384.invoke(util.clj:431)
        at clojure.lang.AFn.run(AFn.java:24)
        at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.RuntimeException:
org.apache.zookeeper.KeeperException$ConnectionLossException:
KeeperErrorCode = ConnectionLoss
        at 
storm.kafka.DynamicBrokersReader.getNumPartitions(DynamicBrokersReader.java:75)
        at 
storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:48)
        ... 7 more
Caused by: org.apache.zookeeper.KeeperException$ConnectionLossException:
KeeperErrorCode = ConnectionLoss
        at 
com.netflix.curator.ConnectionState.getZooKeeper(ConnectionState.java:72)
        at 
com.netflix.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:74)
        at 
com.netflix.curator.framework.imps.CuratorFrameworkImpl.getZooKeeper(CuratorFrameworkImpl.java:353)
        at 
com.netflix.curator.framework.imps.GetChildrenBuilderImpl$3.call(GetChildrenBuilderImpl.java:184)
        at 
com.netflix.curator.framework.imps.GetChildrenBuilderImpl$3.call(GetChildrenBuilderImpl.java:173)
        at com.netflix.curator.RetryLoop.callWithRetry(RetryLoop.java:85)
        at 
com.netflix.curator.framework.imps.GetChildrenBuilderImpl.pathInForeground(GetChildrenBuilderImpl.java:169)
        at 
com.netflix.curator.framework.imps.GetChildrenBuilderImpl.forPath(GetChildrenBuilderImpl.java:161)
        at 
com.netflix.curator.framework.imps.GetChildrenBuilderImpl.forPath(GetChildrenBuilderImpl.java:36)
        at 
storm.kafka.DynamicBrokersReader.getNumPartitions(DynamicBrokersReader.java:72)
        ... 8 more

Here is the code for kafkaspout in topology :

brokerHosts = new ZkHosts(kafkaZookeeper);
SpoutConfig spoutConf = new SpoutConfig(brokerHosts, kafkaTopic, "",
"storm");
 spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConf.forceFromStart = true;
spoutConf.zkPort = 2181;
 builder.setSpout("kafka-spout", new KafkaSpout(spoutConf), 1);

I am using kafka_2.9.2-0.8.1, zookeeper 3.4.6 and storm 0.9.1. How can I
overcome this error?


-Thanks
Nishu Tayal

Reply via email to