I checked the __consumer_offsets topic and here is an extraction from this log for the same consumer group, a specific topic (users) and specific partition (15):
[storm_kafka_topology,users,15]::[OffsetMetadata[8327,{topic-partition=users-15, offset=8327, numFails=0, thread='Thread-11-kafkaSpout-executor[4 4]'}],CommitTime 1503230031557,ExpirationTime 1503316431557] [storm_kafka_topology,users,15]::[OffsetMetadata[8330,{topic-partition=users-15, offset=8330, numFails=0, thread='Thread-11-kafkaSpout-executor[4 4]'}],CommitTime 1503230332504,ExpirationTime 1503316732504] [storm_kafka_topology,users,15]::[OffsetMetadata[6512,{topic-partition=users-15, offset=6512, numFails=0, thread='Thread-11-kafkaSpout-executor[4 4]'}],CommitTime 1503230748612,ExpirationTime 1503317148612] [storm_kafka_topology,users,15]::[OffsetMetadata[8172,{topic-partition=users-15, offset=8172, numFails=0, thread='Thread-11-kafkaSpout-executor[4 4]'}],CommitTime 1503230791209,ExpirationTime 1503317191209] [storm_kafka_topology,users,15]::[OffsetMetadata[8330,{topic-partition=users-15, offset=8330, numFails=0, thread='Thread-11-kafkaSpout-executor[4 4]'}],CommitTime 1503230821337,ExpirationTime 1503317221337] [storm_kafka_topology,users,15]::[OffsetMetadata[8333,{topic-partition=users-15, offset=8333, numFails=0, thread='Thread-11-kafkaSpout-executor[4 4]'}],CommitTime 1503231513311,ExpirationTime 1503317913311] [storm_kafka_topology,users,15]::[OffsetMetadata[8338,{topic-partition=users-15, offset=8338, numFails=0, thread='Thread-11-kafkaSpout-executor[4 4]'}],CommitTime 1503231603513,ExpirationTime 1503318003513] [storm_kafka_topology,users,15]::[OffsetMetadata[8344,{topic-partition=users-15, offset=8344, numFails=0, thread='Thread-11-kafkaSpout-executor[4 4]'}],CommitTime 1503231693829,ExpirationTime 1503318093829] we can see here that the consumer was at offset 8330 at Sunday, August 20, 2017 11:53:51.557 AM and at offset 6512 somes minutes after (the kafka restart occured at this time) On Tue, Aug 22, 2017 at 12:31 AM, Elyahou Ittah <elyaho...@fiverr.com> wrote: > The topology is working well and commiting offset for some times, and I > also restarted it and saw it start from last commited offset, I saw the > issue only at kafka restart. > > I have two zookeeper and they were not restarted. > > > > On Tue, Aug 22, 2017 at 12:24 AM, Stig Rohde Døssing <s...@apache.org> > wrote: > >> I think the __consumer_offsets configuration looks fine, I just wanted to >> be sure there wasn't only one replica. How many Zookeepers do you have, and >> were they restarted as well? >> >> I would suspect that the spout isn't committing properly for some reason. >> The default behavior is to get the committed offset from Kafka when >> starting, and if it is present it is used. If it isn't there the spout >> starts over from the beginning of the partitions. You can check if the >> spout is committing by enabling debug logging for the classes in >> storm-kafka-client, to check logs like this one >> https://github.com/apache/storm/blob/v1.1.0/external/storm- >> kafka-client/src/main/java/org/apache/storm/kafka/spout/ >> internal/OffsetManager.java#L118. >> >> 2017-08-21 22:20 GMT+02:00 Elyahou Ittah <elyaho...@fiverr.com>: >> >>> Hi Stig, >>> >>> I don't have this kind of errors normally. It just occured at the >>> rolling restart of kafka. >>> >>> Also the __consumer_offsets configuration is: >>> Topic:__consumer_offsets PartitionCount:50 ReplicationFactor:3 >>> Configs:segment.bytes=104857600,cleanup.policy=compact,compr >>> ession.type=producer >>> Topic: __consumer_offsets Partition: 0 Leader: 0 Replicas: 2,1,0 Isr: >>> 0,1 >>> ... >>> >>> The fact that the replication factor is 3 even if there is only two >>> broker can cause an issue ? >>> >>> >>> On Mon, Aug 21, 2017 at 6:51 PM, Stig Rohde Døssing <s...@apache.org> >>> wrote: >>> >>>> The spout will reemit some messages if it fails to commit offsets to >>>> Kafka. Are these CommitFailedExceptions occuring in your logs normally? >>>> >>>> Also since the spout stores offsets in Kafka, you may want to check the >>>> replication factor on that topic by running `./kafka-topics.sh --zookeeper >>>> localhost:2181 --describe --topic __consumer_offsets` in one of your Kafka >>>> /bin directories. >>>> >>>> 2017-08-21 17:17 GMT+02:00 Elyahou Ittah <elyaho...@fiverr.com>: >>>> >>>>> The config is the default one, I just set the bootstrap server. >>>>> >>>>> Kafka version is 0.11 >>>>> >>>>> Storm-kafka-client is 1.1.0 >>>>> >>>>> On Mon, Aug 21, 2017 at 5:48 PM, Stig Rohde Døssing <s...@apache.org> >>>>> wrote: >>>>> >>>>>> Hi Elyahou, >>>>>> >>>>>> Could you post your spout configuration, Kafka version and >>>>>> storm-kafka-client version? The logs imply that your spouts are not >>>>>> polling >>>>>> often enough. >>>>>> >>>>>> 2017-08-21 <20%2017%2008%2021> 9:44 GMT+02:00 Elyahou Ittah < >>>>>> elyaho...@fiverr.com>: >>>>>> >>>>>>> I noticed that storm kafka spout reconsume all kafka message after a >>>>>>> rolling restart of kafka cluster. >>>>>>> >>>>>>> This issue occured only with kafkaSpout consumer and not for my >>>>>>> other consumers (ruby based using the kafka consumer API like >>>>>>> kafkaSpout) >>>>>>> >>>>>>> Attached logs of the spout. >>>>>>> >>>>>>> Do you know what can cause this kind of behavior ? >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >