https://github.com/srotya/kafka-monitoring-tool
You can use this or the original project as a standalone monitoring tool for offsets. On Thu, Feb 23, 2017 at 10:52 AM, pradeep s <sreekumar.prad...@gmail.com> wrote: > Hi Priyank, > The confusion is on whats the proper implementation of spout . > > *Method 1* > *=========* > BrokerHosts hosts = new ZkHosts(zkConnString); > SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + > topicName, spoutConfigId); > spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); > KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); > > This is working fine . Only disadvanatage is that offsets are maintained > in zookeeper . As per latest kafka docs , it says offsets can be maintained > in Kafka consumerOffsets topic. > > Tried with method 2 for maintaining offsets in Kafka. > *Method 2* > *=======* > KafkaSpoutConfig<String, String> kafkaSpoutConfig = newKafkaSpoutConfig(); > > KafkaSpout<String, String> spout = new > KafkaSpout<>(kafkaSpoutConfig); > > private static KafkaSpoutConfig<String, String> newKafkaSpoutConfig() { > > Map<String, Object> props = new HashMap<>(); > props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, > bootstrapServers); > props.put(KafkaSpoutConfig.Consumer.GROUP_ID, GROUP_ID); > props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, > "org.apache.kafka.common.serialization. > StringDeserializer"); > props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, > "org.apache.kafka.common.serialization. > StringDeserializer"); > props.put(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT, "false"); > > > String[] topics = new String[1]; > topics[0] = topicName; > > KafkaSpoutStreams kafkaSpoutStreams = > new KafkaSpoutStreams.Builder(new Fields("message"), new > String[] { topicName }).build(); > new KafkaSpoutStreamsNamedTopics.Builder(new > Fields("message"), topics).build(); > > KafkaSpoutTuplesBuilder<String, String> tuplesBuilder = > new KafkaSpoutTuplesBuilder.Builder<>(new > TuplesBuilder(topicName)).build(); > new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(new > TuplesBuilder(topicName)).build(); > > KafkaSpoutConfig<String, String> spoutConf = > new KafkaSpoutConfig.Builder<>(props, kafkaSpoutStreams, > tuplesBuilder).build(); > > return spoutConf; > } > > > But this method was showing spout errors as attached in logs. > > *Is it ok to maintain offsets in zookeeper *. Any disadvantages with this > approach. Please suggest. > > Logs attached . > > Regards > Pradeep S > > > > On Wed, Feb 22, 2017 at 5:25 PM, Priyank Shah <ps...@hortonworks.com> > wrote: > >> Hi Pradeep, >> >> >> >> Both the spouts are correct and good to use. However, we should use the >> newer spout. The one that is uses broker hosts. Reason is it uses latest >> kafka consumer api. When do you get commit failed exception? Can you send >> your worker logs where the spout is running? If you don’t see anything In >> the logs try changing the log level from ui. >> >> >> >> *From: *pradeep s <sreekumar.prad...@gmail.com> >> *Reply-To: *"user@storm.apache.org" <user@storm.apache.org> >> *Date: *Tuesday, February 21, 2017 at 5:14 PM >> >> *To: *"user@storm.apache.org" <user@storm.apache.org> >> *Subject: *Re: Storm Kafka offset monitoring >> >> >> >> Hi Priyank >> >> Currently I tested the spout using zookeeper broker hosts .This is >> processing fine.But I saw another way of initialising spout using Kafka >> bootstrap severs using kafkaspoutconfig class. >> >> >> >> https://storm.apache.org/releases/1.0.2/javadocs/org/apache/ >> storm/kafka/spout/KafkaSpoutConfig.Builder.html >> >> >> >> >> >> But implementing this way I was getting commit failed exception due to >> rebalance >> >> Can you point out what's the proper way for implementing Kafka spout. >> >> In storm 1.0.3 docs I have seen the way using zookeeper broker hosts >> >> Regards >> >> Pradeep S >> >> On Tue, Feb 21, 2017 at 2:35 PM Priyank Shah <ps...@hortonworks.com> >> wrote: >> >> Hi Pradeep, >> >> >> >> A release vote for RC1 of 1.1.0 is in progress. You can track that and >> once it gets released you can upgrade. >> >> >> >> Regarding upgrading your spout, you don’t need to make any code changes. >> You just need to use the latest released spout code. Usually it involves >> updating a pom file that has a dependency on storm-kafka module to latest >> version. >> >> >> >> *From: *pradeep s <sreekumar.prad...@gmail.com> >> *Reply-To: *"user@storm.apache.org" <user@storm.apache.org> >> *Date: *Tuesday, February 21, 2017 at 12:49 PM >> *To: *"user@storm.apache.org" <user@storm.apache.org> >> *Subject: *Re: Storm Kafka offset monitoring >> >> >> >> Hi Priyank >> >> Thanks for your reply.i was not able to find 1.1.0 version of storm >> >> Can you please point to that.also can you please confirm on what >> specific spout changes to make. >> >> Regards >> >> Pradeep S >> >> On Tue, Feb 21, 2017 at 10:54 AM Priyank Shah <ps...@hortonworks.com> >> wrote: >> >> Hi Pradeep, >> >> >> >> If you upgrade your spout in the topology and storm code to a later >> version(I checked v1.1.0 and it has the tool) you will get a table in storm >> ui which show you offsets. If you cannot upgrade then I think you will have >> to do it manually. >> >> >> >> *From: *pradeep s <sreekumar.prad...@gmail.com> >> *Reply-To: *"user@storm.apache.org" <user@storm.apache.org> >> *Date: *Tuesday, February 21, 2017 at 9:44 AM >> *To: *"user@storm.apache.org" <user@storm.apache.org> >> *Subject: *Storm Kafka offset monitoring >> >> >> >> Hi , >> >> I am using Storm 1.0.2 and Kafka 0.10.1.1 versions . Storm spout is >> configured using zookeeper broker hosts. Is there a monitoring ui which i >> can use to track the consumer offsets and lag. >> >> I was using yahoo kafka manager , but its showing storm spout as a >> consumer.Any help? >> >> Regards >> >> Pradeep S >> >> >