https://github.com/srotya/kafka-monitoring-tool

You can use this or the original project as a standalone monitoring tool
for offsets.

On Thu, Feb 23, 2017 at 10:52 AM, pradeep s <sreekumar.prad...@gmail.com>
wrote:

> Hi Priyank,
> The confusion is on whats the proper implementation of spout .
>
> *Method 1*
> *=========*
> BrokerHosts hosts = new ZkHosts(zkConnString);
>         SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" +
> topicName, spoutConfigId);
>         spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
>         KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
>
> This is working fine . Only disadvanatage is that offsets are maintained
> in zookeeper . As per latest kafka docs , it says offsets can be maintained
> in Kafka consumerOffsets topic.
>
> Tried with method 2 for maintaining offsets in Kafka.
> *Method 2*
> *=======*
>  KafkaSpoutConfig<String, String> kafkaSpoutConfig = newKafkaSpoutConfig();
>
>         KafkaSpout<String, String> spout = new
> KafkaSpout<>(kafkaSpoutConfig);
>
> private static KafkaSpoutConfig<String, String> newKafkaSpoutConfig() {
>
>         Map<String, Object> props = new HashMap<>();
>         props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS,
> bootstrapServers);
>         props.put(KafkaSpoutConfig.Consumer.GROUP_ID, GROUP_ID);
>         props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER,
>                 "org.apache.kafka.common.serialization.
> StringDeserializer");
>         props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER,
>                 "org.apache.kafka.common.serialization.
> StringDeserializer");
>         props.put(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT, "false");
>
>
>         String[] topics = new String[1];
>         topics[0] = topicName;
>
>         KafkaSpoutStreams kafkaSpoutStreams =
>                 new KafkaSpoutStreams.Builder(new Fields("message"), new
> String[] { topicName }).build();
>                 new KafkaSpoutStreamsNamedTopics.Builder(new
> Fields("message"), topics).build();
>
>         KafkaSpoutTuplesBuilder<String, String> tuplesBuilder =
>                 new KafkaSpoutTuplesBuilder.Builder<>(new
> TuplesBuilder(topicName)).build();
>                 new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(new
> TuplesBuilder(topicName)).build();
>
>         KafkaSpoutConfig<String, String> spoutConf =
>                 new KafkaSpoutConfig.Builder<>(props, kafkaSpoutStreams,
> tuplesBuilder).build();
>
>         return spoutConf;
>     }
>
>
> But this method was showing spout errors as attached in logs.
>
> *Is it ok to maintain offsets in zookeeper *. Any disadvantages with this
> approach. Please suggest.
>
> Logs attached .
>
> Regards
> Pradeep S
>
>
>
> On Wed, Feb 22, 2017 at 5:25 PM, Priyank Shah <ps...@hortonworks.com>
> wrote:
>
>> Hi Pradeep,
>>
>>
>>
>> Both the spouts are correct and good to use. However, we should use the
>> newer spout. The one that is uses broker hosts. Reason is it uses latest
>> kafka consumer api. When do you get commit failed exception? Can you send
>> your worker logs where the spout is running? If you don’t see anything In
>> the logs try changing the log level from ui.
>>
>>
>>
>> *From: *pradeep s <sreekumar.prad...@gmail.com>
>> *Reply-To: *"user@storm.apache.org" <user@storm.apache.org>
>> *Date: *Tuesday, February 21, 2017 at 5:14 PM
>>
>> *To: *"user@storm.apache.org" <user@storm.apache.org>
>> *Subject: *Re: Storm Kafka offset monitoring
>>
>>
>>
>> Hi Priyank
>>
>> Currently I tested the spout using zookeeper broker hosts .This is
>> processing fine.But I saw another way of initialising spout using Kafka
>> bootstrap severs using kafkaspoutconfig class.
>>
>>
>>
>> https://storm.apache.org/releases/1.0.2/javadocs/org/apache/
>> storm/kafka/spout/KafkaSpoutConfig.Builder.html
>>
>>
>>
>>
>>
>> But implementing this way I was getting commit failed exception due to
>> rebalance
>>
>> Can you point out what's the proper way for implementing Kafka spout.
>>
>> In storm 1.0.3 docs I have seen the way using zookeeper broker hosts
>>
>> Regards
>>
>> Pradeep S
>>
>> On Tue, Feb 21, 2017 at 2:35 PM Priyank Shah <ps...@hortonworks.com>
>> wrote:
>>
>> Hi Pradeep,
>>
>>
>>
>> A release vote for RC1 of 1.1.0 is in progress. You can track that and
>> once it gets released you can upgrade.
>>
>>
>>
>> Regarding upgrading your spout, you don’t need to make any code changes.
>> You just need to use the latest released spout code. Usually it involves
>> updating a pom file that has a dependency on storm-kafka module to latest
>> version.
>>
>>
>>
>> *From: *pradeep s <sreekumar.prad...@gmail.com>
>> *Reply-To: *"user@storm.apache.org" <user@storm.apache.org>
>> *Date: *Tuesday, February 21, 2017 at 12:49 PM
>> *To: *"user@storm.apache.org" <user@storm.apache.org>
>> *Subject: *Re: Storm Kafka offset monitoring
>>
>>
>>
>> Hi Priyank
>>
>> Thanks for your reply.i was not able to find 1.1.0 version of storm
>>
>> Can you please point to that.also can you please  confirm on what
>> specific spout changes to make.
>>
>> Regards
>>
>> Pradeep S
>>
>> On Tue, Feb 21, 2017 at 10:54 AM Priyank Shah <ps...@hortonworks.com>
>> wrote:
>>
>> Hi Pradeep,
>>
>>
>>
>> If you upgrade your spout in the topology and storm code to a later
>> version(I checked v1.1.0 and it has the tool) you will get a table in storm
>> ui which show you offsets. If you cannot upgrade then I think you will have
>> to do it manually.
>>
>>
>>
>> *From: *pradeep s <sreekumar.prad...@gmail.com>
>> *Reply-To: *"user@storm.apache.org" <user@storm.apache.org>
>> *Date: *Tuesday, February 21, 2017 at 9:44 AM
>> *To: *"user@storm.apache.org" <user@storm.apache.org>
>> *Subject: *Storm Kafka offset monitoring
>>
>>
>>
>> Hi ,
>>
>> I am using Storm 1.0.2 and Kafka 0.10.1.1 versions . Storm spout is
>> configured using zookeeper broker hosts. Is there a monitoring ui which i
>> can use to track the consumer offsets and lag.
>>
>> I was using yahoo kafka manager , but its showing storm spout as a
>> consumer.Any help?
>>
>> Regards
>>
>> Pradeep S
>>
>>
>

Reply via email to