Thank you, I am using the same groupId all the time. I printed OffsetsMessageFormatter output for the consumer, and the output is showing as
[async_force_consumers,force_msgs,9]::OffsetAndMetadata[2,associated metadata,1430277791077] But If I restart the consumer, it starts consuming messages from offset 1 for partition 9. Even though I have stored the offset as 2. I am not sure what I am missing here. Thanks & Regards, On Wed, Apr 29, 2015 at 5:17 PM, Jiangjie Qin <j...@linkedin.com.invalid> wrote: > OK, so you turned off auto.offset.commit, and set the auto.offset.reset to > largest. > > That means when you consume, > 1. If you did not commit offsets manually, no offsets will be committed to > Kafka. > 2. If you do not have an offset stored in Kafka, you will start from the > log end and ignore the existing messages in the topic. > > Another thing you want to check is that are you using the group Id all the > time? > > Jiangjie (Becket) Qin > > On 4/29/15, 3:17 PM, "Gomathivinayagam Muthuvinayagam" > <sankarm...@gmail.com> wrote: > > >I am using Kafka 0.8.2 and I am using Kafka based storage for offset. > >Whenever I restart a consumer (high level consumer api) it is not > >consuming > >messages whichever were posted when the consumer was down. > > > >I am using the following consumer properties > > > > Properties props = new Properties(); > > > > props.put("zookeeper.connect", zooKeeper); > > > > props.put("group.id", consumerName); > > > > props.put("zookeeper.session.timeout.ms", "6000"); > > > > props.put("zookeeper.sync.time.ms", "200"); > > > > props.put("auto.commit.enable", "false"); > > > > props.put("offsets.storage", "kafka"); > > > > props.put("dual.commit.enabled", "false"); > > > > props.put("auto.offset.reset", "largest"); > > > > > >My offset manager is here > >https://gist.github.com/gomathi/0d63e29385017577ce3a. I am not sure why > >the > >consumer is behaving weird. Please share any updates if you have. > > > > > > > >Thanks & Regards, > >