Hi Guys,
*I discovered a way how the offsets are managed. These offsets are stored
at zookeeper. *
I query the zookeeper by connecting using zkCli.sh.
[zk: localhost:2181(CONNECTED) 2] get
/exampleTopic_1/exampleTopic_1/partition_0
{"topology":{"id":"8543e932-48ed-4902-97cb-f3a21696563f","name":"example-topology"},"offset":416,"partition":0,"broker":{"host":"kafka-2.c.high-plating-825.internal","port":9092},"topic":"exampleTopic_1"}
cZxid = 0x102486042
ctime = Tue Jun 09 02:56:20 UTC 2015
mZxid = 0x4007b04d0
mtime = Sun Jun 21 16:13:24 UTC 2015
pZxid = 0x102486042
cversion = 0
dataVersion = 7153
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 201
numChildren = 0
*The total number of messages in kafka for the topic are as follows.*
I logged in to Kafka and run this command to see the number of items in the
consumer..
./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
localhost:9092,localhost:9093 --topic catTopic_1 --time -1
exampleTopic_1:0:67302
exampleTopic_1:1:70455
exampleTopic_1:2:67990
exampleTopic_1:3:73161
exampleTopic_1:4:76792
exampleTopic_1:5:83197
exampleTopic_1:6:75131
exampleTopic_1:7:64840
>From these two samples, I can conclude that my KafkaSpout is not fetching
from the offset.
*Question Number 1:*
What should I do to increase the rate at which the consumers(In my case
the are consumer is Bolt).
Should I increase the number of supervisors? If so, will these new
supervisors read from the same offset, in which case it is 416 for
partition_0?
How would I handle a backed up topology situation so that it can consume
all the messages?
*Also question number 2:*
I see that the same messages are consumed more than once by my Topology.
In my topology I use KafkaSpout and ExampleBolt. I want to enforce a strict
thing saying that "A message is processed exactly once, Not at least
once.". Can I achieve that using KafkaSpout?
I just put an instrumentation saying that whenever a message is executed by
Bolt, I will update counter.
The results are as follows
* msg_id |
c----------------------------------------------------------------32cd1320-17dd-11e5-8080-808080808080
| 33136e2c0-17dd-11e5-8080-808080808080 | 3*
The above cassandra table shows that 32cd1320-17dd-11e5-8080-808080808080
is processed 3 times(indicted by c counter column).
I just put two messages but most of the cases I will get either 3, 2 or 6.
But I want only 1.
Please advise if you have any thoughts.
Thanks,
Venkat
On Tue, Jun 16, 2015 at 6:38 PM, Venkat Mantirraju <[email protected]> wrote:
> Hi All,
> I have a question and that I want to get your guys inputs.
>
> Subject: finding offsets of current consumers in kaka-storm
>
> I have a topic with 8 partitions and my kafka brokers are running on 2
> servers..
>
> I am running Storm topology with Kafka Spout.
> I would like to know my current offset of Kafka Spout and Kafka current
> position in the topic.
>
> What is the best way to find the current offest position of my Kafka
> Spout. What is the delta.
>
> My KafkaSpout is configured by the below method..
>
> public static KafkaSpout formKafkaSpoutForThisTopic(String
> zkConnString,
> String topicName) {
> BrokerHosts hosts = new ZkHosts(zkConnString);
> SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/"
> + topicName, topicName);
> spoutConfig.scheme = new SchemeAsMultiScheme(new
> StringScheme());
> KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
>
> return kafkaSpout;
> }
>
> What is the best mechanism to find out the current offsets in the topic
> for each partition and its consumers.
>
> I tried various ways and am left with no luck.
>
> I am very much interested in contributing to Storm project.
>
> Also please tell me what is the best place to start with contribution.
>
> Thanks,
> Venkat
>
>
>