In Kafka 2.0, default offsets retention period is changed to 7 days[a] and
better expiration semantics of consumer groups was added as part of
KIP-211[b].


a - https://kafka.apache.org/documentation/#upgrade_200_notable
b -
https://cwiki.apache.org/confluence/display/KAFKA/KIP-211%3A+Revise+Expiration+Semantics+of+Consumer+Group+Offsets


On Mon, Aug 27, 2018 at 7:26 PM, Biplob Biswas <revolutioni...@gmail.com>
wrote:

> Hi Manikumar,
>
> Thanks for the reply, it seems you caught the correct nerve as we never set
> this property, and if kafka is taking the default value then it would mean
> that the consumer group would expire in a day, which completely explains
> our behaviour.
>
> Thanks a lot.  We will test this and verify and report back.
>
> Thanks & Regards
> Biplob Biswas
>
>
> On Mon, Aug 27, 2018 at 10:12 AM Manikumar <manikumar.re...@gmail.com>
> wrote:
>
> > What is the value set for offsets.retention.minutes?  Default value is
> > `1day` in Kafka 0.11 release.
> > Based on your requirement, you may need to increase this value.
> >
> > On Mon, Aug 27, 2018 at 1:33 PM Biplob Biswas <revolutioni...@gmail.com>
> > wrote:
> >
> > > Hi,
> > >
> > > Can anyone provide any insights to the issue we faced?
> > >
> > > Thanks & Regards
> > > Biplob Biswas
> > >
> > >
> > > On Fri, Aug 24, 2018 at 4:50 PM Biplob Biswas <
> revolutioni...@gmail.com>
> > > wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > *Short Version:*
> > > > We had an unusual situation where after restart of our spark job,
> > rather
> > > > than reading from the last known offset of the consumer group, it
> > started
> > > > consuming from the latest offset, thus leading to data loss.
> > > >
> > > > *Long Version:*
> > > > We have a spark job which crashed last Friday due to an NPE
> introduced
> > by
> > > > developer code. Over the weekend the job was not restarted, although
> on
> > > > monday the bug was fixed and the job was restarted. The
> consumer-group
> > > was
> > > > *NOT* changed, but when we restarted the job, the spark job started
> > > > reading form the latest offset.
> > > >
> > > > We started investigating the reasons for the same and we verified our
> > > > spark job configuration and it looks good, we tried to reproduce the
> > > issue
> > > > by killing the job and restarting while new events are coming in and
> it
> > > > reads the buffered data properly. We also verified that the consumer
> > > group
> > > > had a valid lag which was reducing over time.
> > > >
> > > > Next we checked __consumer_offsets topic and we found the something
> > which
> > > > could explain why spark job started reading fro latest. We found the
> > > > following lines which were not compressed by the topic itself
> > > >
> > > > [spark-job-consumer-group,spark-job-reading-topic,1]::NULL
> > > >> [spark-job-consumer-group,spark-job-reading-topic,3]::NULL
> > > >> [spark-job-consumer-group,spark-job-reading-topic,0]::NULL
> > > >> [spark-job-consumer-group,spark-job-reading-topic,2]::NULL
> > > >
> > > > --
> > > > These following offset commits came after we started ingesting again
> on
> > > > monday after bug fix:
> > > >
> > > >
> > >
> > [spark-job-consumer-group,spark-job-reading-topic,3]::[
> OffsetMetadata[606979,NO_METADATA],CommitTime
> > > >> 1534778505013,ExpirationTime 1534864905013]
> > > >>
> > >
> > [spark-job-consumer-group,spark-job-reading-topic,1]::[
> OffsetMetadata[607195,NO_METADATA],CommitTime
> > > >> 1534778505013,ExpirationTime 1534864905013]
> > > >>
> > >
> > [spark-job-consumer-group,spark-job-reading-topic,2]::[
> OffsetMetadata[607100,NO_METADATA],CommitTime
> > > >> 1534778505013,ExpirationTime 1534864905013]
> > > >>
> > >
> > [spark-job-consumer-group,spark-job-reading-topic,0]::[
> OffsetMetadata[607330,NO_METADATA],CommitTime
> > > >> 1534778505013,ExpirationTime 1534864905013]
> > > >>
> > >
> > [spark-job-consumer-group,spark-job-reading-topic,3]::[
> OffsetMetadata[606979,NO_METADATA],CommitTime
> > > >> 1534778508010,ExpirationTime 1534864908010]
> > > >>
> > >
> > [spark-job-consumer-group,spark-job-reading-topic,1]::[
> OffsetMetadata[607195,NO_METADATA],CommitTime
> > > >> 1534778508010,ExpirationTime 1534864908010]
> > > >>
> > >
> > [spark-job-consumer-group,spark-job-reading-topic,2]::[
> OffsetMetadata[607100,NO_METADATA],CommitTime
> > > >> 1534778508010,ExpirationTime 1534864908010]
> > > >>
> > >
> > [spark-job-consumer-group,spark-job-reading-topic,0]::[
> OffsetMetadata[607330,NO_METADATA],CommitTime
> > > >> 1534778508010,ExpirationTime 1534864908010]
> > > >>
> > >
> > [spark-job-consumer-group,spark-job-reading-topic,3]::[
> OffsetMetadata[606979,NO_METADATA],CommitTime
> > > >> 1534778511010,ExpirationTime 1534864911010]
> > > >>
> > >
> > [spark-job-consumer-group,spark-job-reading-topic,1]::[
> OffsetMetadata[607195,NO_METADATA],CommitTime
> > > >> 1534778511010,ExpirationTime 1534864911010]
> > > >>
> > >
> > [spark-job-consumer-group,spark-job-reading-topic,2]::[
> OffsetMetadata[607100,NO_METADATA],CommitTime
> > > >> 1534778511010,ExpirationTime 1534864911010]
> > > >
> > > >
> > > >
> > > > But this only showed the reason of the behaviour not the cause, so we
> > > > looked into kafka logs -  during the time from when our kafka job
> died
> > > and
> > > > when we restarted, we found a few exception although these doesn't
> look
> > > > like that it could be the root cause but I still wanted to make sure:
> > > >
> > > >>
> > > >> 2018-08-20 09:01:20,016 INFO kafka.controller.KafkaController:
> > > >> [Controller 180]: Starting preferred replica leader election for
> > > partitions
> > > >> [__consumer_offsets,2]
> > > >> 2018-08-20 09:01:20,016 INFO kafka.controller.
> PartitionStateMachine:
> > > >> [Partition state machine on Controller 180]: Invoking state change
> to
> > > >> OnlinePartition for partitions [__consumer_offsets,2]
> > > >> 2018-08-20 09:01:20,019 INFO
> > > >> kafka.controller.PreferredReplicaPartitionLeaderSelector:
> > > >> [PreferredReplicaPartitionLeaderSelector]: Current leader 180 for
> > > partition
> > > >> [__consumer_offsets,2] is not the preferred replica. Triggering
> > > preferred
> > > >> replica leader election
> > > >> 2018-08-20 09:01:20,019 ERROR state.change.logger: Controller 180
> > epoch
> > > >> 37 encountered error while electing leader for partition
> > > >> [__consumer_offsets,2] due to: Preferred replica 182 for partition
> > > >> [__consumer_offsets,2] is either not alive or not in the isr.
> Current
> > > >> leader and ISR: [{"leader":180,"leader_epoch":2,"isr":[181,180]}].
> > > >> 2018-08-20 09:01:20,019 ERROR state.change.logger: Controller 180
> > epoch
> > > >> 37 initiated state change for partition [__consumer_offsets,2] from
> > > >> OnlinePartition to OnlinePartition failed
> > > >> kafka.common.StateChangeFailedException: encountered error while
> > > electing
> > > >> leader for partition [__consumer_offsets,2] due to: Preferred
> replica
> > > 182
> > > >> for partition [__consumer_offsets,2] is either not alive or not in
> the
> > > isr.
> > > >> Current leader and ISR:
> > > [{"leader":180,"leader_epoch":2,"isr":[181,180]}].
> > > >>         at
> > > >>
> > >
> > kafka.controller.PartitionStateMachine.electLeaderForPartition(
> PartitionStateMachine.scala:330)
> > > >>         at
> > > >>
> > >
> > kafka.controller.PartitionStateMachine.kafka$controller$
> PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:166)
> > > >>         at
> > > >>
> > >
> > kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.
> apply(PartitionStateMachine.scala:111)
> > > >>         at
> > > >>
> > >
> > kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.
> apply(PartitionStateMachine.scala:110)
> > > >>         at scala.collection.immutable.
> Set$Set1.foreach(Set.scala:94)
> > > >>         at
> > > >>
> > >
> > kafka.controller.PartitionStateMachine.handleStateChanges(
> PartitionStateMachine.scala:110)
> > > >>         at
> > > >>
> > >
> > kafka.controller.KafkaController.onPreferredReplicaElection(
> KafkaController.scala:581)
> > > >>         at
> > > >>
> > >
> > kafka.controller.KafkaController$$anonfun$kafka$controller$
> KafkaController$$checkAndTriggerAutoLeaderRebalance$3$$anonfun$apply$14.
> apply(KafkaController.scala:1126)
> > > >>         at
> > > >>
> > >
> > kafka.controller.KafkaController$$anonfun$kafka$controller$
> KafkaController$$checkAndTriggerAutoLeaderRebalance$3$$anonfun$apply$14.
> apply(KafkaController.scala:1119)
> > > >>         at
> > > >>
> > >
> > scala.collection.mutable.HashMap$$anon$1$$anonfun$
> foreach$2.apply(HashMap.scala:134)
> > > >>         at
> > > >>
> > >
> > scala.collection.mutable.HashMap$$anon$1$$anonfun$
> foreach$2.apply(HashMap.scala:134)
> > > >>         at
> > > >>
> > >
> > scala.collection.mutable.HashTable$class.foreachEntry(
> HashTable.scala:236)
> > > >>         at
> > > scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
> > > >>         at
> > > >> scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:134)
> > > >>         at
> > > >>
> > >
> > kafka.controller.KafkaController$$anonfun$kafka$controller$
> KafkaController$$checkAndTriggerAutoLeaderRebal
> ance$3.apply(KafkaController.scala:1119)
> > > >>         at
> > > >>
> > >
> > kafka.controller.KafkaController$$anonfun$kafka$controller$
> KafkaController$$checkAndTriggerAutoLeaderRebal
> ance$3.apply(KafkaController.scala:1106)
> > > >>         at scala.collection.immutable.Map$Map3.foreach(Map.scala:
> 161)
> > > >>         at
> > > >>
> > >
> > kafka.controller.KafkaController.kafka$controller$KafkaController$$
> checkAndTriggerAutoLeaderRebalance(KafkaController.scala:1106)
> > > >>         at
> > > >>
> > >
> > kafka.controller.KafkaController$AutoPreferredReplicaLeaderElec
> tion$.process(KafkaController.scala:1380)
> > > >>         at
> > > >>
> > >
> > kafka.controller.ControllerEventManager$ControllerEventThread$$
> anonfun$doWork$1.apply$mcV$sp(ControllerEventManager.scala:50)
> > > >>         at
> > > >>
> > >
> > kafka.controller.ControllerEventManager$ControllerEventThread$$
> anonfun$doWork$1.apply(ControllerEventManager.scala:50)
> > > >>         at
> > > >>
> > >
> > kafka.controller.ControllerEventManager$ControllerEventThread$$
> anonfun$doWork$1.apply(ControllerEventManager.scala:50)
> > > >>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
> > > >>         at
> > > >>
> > >
> > kafka.controller.ControllerEventManager$ControllerEventThread.doWork(
> ControllerEventManager.scala:49)
> > > >>         at
> > > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
> > > >> Caused by: kafka.common.StateChangeFailedException: Preferred
> replica
> > > 182
> > > >> for partition [__consumer_offsets,2] is either not alive or not in
> the
> > > isr.
> > > >> Current leader and ISR:
> > > [{"leader":180,"leader_epoch":2,"isr":[181,180]}]
> > > >>         at
> > > >>
> > >
> > kafka.controller.PreferredReplicaPartitionLeaderSelector.selectLeader(
> PartitionLeaderSelector.scala:157)
> > > >>         at
> > > >>
> > >
> > kafka.controller.PartitionStateMachine.electLeaderForPartition(
> PartitionStateMachine.scala:308)
> > > >>         ... 24 more
> > > >> 2018-08-20 09:01:20,020 WARN kafka.controller.KafkaController:
> > > >> [Controller 180]: Partition [__consumer_offsets,2] failed to
> complete
> > > >> preferred replica leader election. Leader is 180
> > > >> 2018-08-20 09:01:24,396 INFO kafka.server.ReplicaFetcherThread:
> > > >> [ReplicaFetcherThread-0-182]: Stopped
> > > >> 2018-08-20 09:01:24,397 INFO kafka.server.ReplicaFetcherThread:
> > > >> [ReplicaFetcherThread-0-182]: Shutdown completed
> > > >
> > > >
> > > >
> > > > Our Kafka is part of the Cloudera Distribution -  *CDH 5.13.1*, the
> > > > version of kafka running is Kafka version : *0.11.0-kafka-3.0.0*,
> also
> > it
> > > > maybe important to note that we are manually committing the offsets
> > after
> > > > processing each batch of data within the spark job.Few important
> kafka
> > > > properties are as follows:
> > > >
> > > >>
> > > >> *group.id <http://group.id/>=spark-job-consumer-group*
> > > >> *auto.offset.reset=latest**enable.auto.commit=false *
> > > >
> > > >
> > > > We still don't have any root cause for the issue and its worrisome as
> > > this
> > > > behaviour on production is not really acceptable. Any help or
> pointers
> > > > would be really appreciated.
> > > > Also please let me know if you need any more information from my
> > > >
> > > > Thanks & Regards
> > > > Biplob Biswas
> > > >
> > >
> >
>

Reply via email to