Hi Manikumar,

Thanks for the reply, it seems you caught the correct nerve as we never set
this property, and if kafka is taking the default value then it would mean
that the consumer group would expire in a day, which completely explains
our behaviour.

Thanks a lot.  We will test this and verify and report back.

Thanks & Regards
Biplob Biswas


On Mon, Aug 27, 2018 at 10:12 AM Manikumar <manikumar.re...@gmail.com>
wrote:

> What is the value set for offsets.retention.minutes?  Default value is
> `1day` in Kafka 0.11 release.
> Based on your requirement, you may need to increase this value.
>
> On Mon, Aug 27, 2018 at 1:33 PM Biplob Biswas <revolutioni...@gmail.com>
> wrote:
>
> > Hi,
> >
> > Can anyone provide any insights to the issue we faced?
> >
> > Thanks & Regards
> > Biplob Biswas
> >
> >
> > On Fri, Aug 24, 2018 at 4:50 PM Biplob Biswas <revolutioni...@gmail.com>
> > wrote:
> >
> > > Hi everyone,
> > >
> > > *Short Version:*
> > > We had an unusual situation where after restart of our spark job,
> rather
> > > than reading from the last known offset of the consumer group, it
> started
> > > consuming from the latest offset, thus leading to data loss.
> > >
> > > *Long Version:*
> > > We have a spark job which crashed last Friday due to an NPE introduced
> by
> > > developer code. Over the weekend the job was not restarted, although on
> > > monday the bug was fixed and the job was restarted. The consumer-group
> > was
> > > *NOT* changed, but when we restarted the job, the spark job started
> > > reading form the latest offset.
> > >
> > > We started investigating the reasons for the same and we verified our
> > > spark job configuration and it looks good, we tried to reproduce the
> > issue
> > > by killing the job and restarting while new events are coming in and it
> > > reads the buffered data properly. We also verified that the consumer
> > group
> > > had a valid lag which was reducing over time.
> > >
> > > Next we checked __consumer_offsets topic and we found the something
> which
> > > could explain why spark job started reading fro latest. We found the
> > > following lines which were not compressed by the topic itself
> > >
> > > [spark-job-consumer-group,spark-job-reading-topic,1]::NULL
> > >> [spark-job-consumer-group,spark-job-reading-topic,3]::NULL
> > >> [spark-job-consumer-group,spark-job-reading-topic,0]::NULL
> > >> [spark-job-consumer-group,spark-job-reading-topic,2]::NULL
> > >
> > > --
> > > These following offset commits came after we started ingesting again on
> > > monday after bug fix:
> > >
> > >
> >
> [spark-job-consumer-group,spark-job-reading-topic,3]::[OffsetMetadata[606979,NO_METADATA],CommitTime
> > >> 1534778505013,ExpirationTime 1534864905013]
> > >>
> >
> [spark-job-consumer-group,spark-job-reading-topic,1]::[OffsetMetadata[607195,NO_METADATA],CommitTime
> > >> 1534778505013,ExpirationTime 1534864905013]
> > >>
> >
> [spark-job-consumer-group,spark-job-reading-topic,2]::[OffsetMetadata[607100,NO_METADATA],CommitTime
> > >> 1534778505013,ExpirationTime 1534864905013]
> > >>
> >
> [spark-job-consumer-group,spark-job-reading-topic,0]::[OffsetMetadata[607330,NO_METADATA],CommitTime
> > >> 1534778505013,ExpirationTime 1534864905013]
> > >>
> >
> [spark-job-consumer-group,spark-job-reading-topic,3]::[OffsetMetadata[606979,NO_METADATA],CommitTime
> > >> 1534778508010,ExpirationTime 1534864908010]
> > >>
> >
> [spark-job-consumer-group,spark-job-reading-topic,1]::[OffsetMetadata[607195,NO_METADATA],CommitTime
> > >> 1534778508010,ExpirationTime 1534864908010]
> > >>
> >
> [spark-job-consumer-group,spark-job-reading-topic,2]::[OffsetMetadata[607100,NO_METADATA],CommitTime
> > >> 1534778508010,ExpirationTime 1534864908010]
> > >>
> >
> [spark-job-consumer-group,spark-job-reading-topic,0]::[OffsetMetadata[607330,NO_METADATA],CommitTime
> > >> 1534778508010,ExpirationTime 1534864908010]
> > >>
> >
> [spark-job-consumer-group,spark-job-reading-topic,3]::[OffsetMetadata[606979,NO_METADATA],CommitTime
> > >> 1534778511010,ExpirationTime 1534864911010]
> > >>
> >
> [spark-job-consumer-group,spark-job-reading-topic,1]::[OffsetMetadata[607195,NO_METADATA],CommitTime
> > >> 1534778511010,ExpirationTime 1534864911010]
> > >>
> >
> [spark-job-consumer-group,spark-job-reading-topic,2]::[OffsetMetadata[607100,NO_METADATA],CommitTime
> > >> 1534778511010,ExpirationTime 1534864911010]
> > >
> > >
> > >
> > > But this only showed the reason of the behaviour not the cause, so we
> > > looked into kafka logs -  during the time from when our kafka job died
> > and
> > > when we restarted, we found a few exception although these doesn't look
> > > like that it could be the root cause but I still wanted to make sure:
> > >
> > >>
> > >> 2018-08-20 09:01:20,016 INFO kafka.controller.KafkaController:
> > >> [Controller 180]: Starting preferred replica leader election for
> > partitions
> > >> [__consumer_offsets,2]
> > >> 2018-08-20 09:01:20,016 INFO kafka.controller.PartitionStateMachine:
> > >> [Partition state machine on Controller 180]: Invoking state change to
> > >> OnlinePartition for partitions [__consumer_offsets,2]
> > >> 2018-08-20 09:01:20,019 INFO
> > >> kafka.controller.PreferredReplicaPartitionLeaderSelector:
> > >> [PreferredReplicaPartitionLeaderSelector]: Current leader 180 for
> > partition
> > >> [__consumer_offsets,2] is not the preferred replica. Triggering
> > preferred
> > >> replica leader election
> > >> 2018-08-20 09:01:20,019 ERROR state.change.logger: Controller 180
> epoch
> > >> 37 encountered error while electing leader for partition
> > >> [__consumer_offsets,2] due to: Preferred replica 182 for partition
> > >> [__consumer_offsets,2] is either not alive or not in the isr. Current
> > >> leader and ISR: [{"leader":180,"leader_epoch":2,"isr":[181,180]}].
> > >> 2018-08-20 09:01:20,019 ERROR state.change.logger: Controller 180
> epoch
> > >> 37 initiated state change for partition [__consumer_offsets,2] from
> > >> OnlinePartition to OnlinePartition failed
> > >> kafka.common.StateChangeFailedException: encountered error while
> > electing
> > >> leader for partition [__consumer_offsets,2] due to: Preferred replica
> > 182
> > >> for partition [__consumer_offsets,2] is either not alive or not in the
> > isr.
> > >> Current leader and ISR:
> > [{"leader":180,"leader_epoch":2,"isr":[181,180]}].
> > >>         at
> > >>
> >
> kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:330)
> > >>         at
> > >>
> >
> kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:166)
> > >>         at
> > >>
> >
> kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:111)
> > >>         at
> > >>
> >
> kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:110)
> > >>         at scala.collection.immutable.Set$Set1.foreach(Set.scala:94)
> > >>         at
> > >>
> >
> kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:110)
> > >>         at
> > >>
> >
> kafka.controller.KafkaController.onPreferredReplicaElection(KafkaController.scala:581)
> > >>         at
> > >>
> >
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerAutoLeaderRebalance$3$$anonfun$apply$14.apply(KafkaController.scala:1126)
> > >>         at
> > >>
> >
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerAutoLeaderRebalance$3$$anonfun$apply$14.apply(KafkaController.scala:1119)
> > >>         at
> > >>
> >
> scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134)
> > >>         at
> > >>
> >
> scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134)
> > >>         at
> > >>
> >
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
> > >>         at
> > scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
> > >>         at
> > >> scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:134)
> > >>         at
> > >>
> >
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerAutoLeaderRebalance$3.apply(KafkaController.scala:1119)
> > >>         at
> > >>
> >
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerAutoLeaderRebalance$3.apply(KafkaController.scala:1106)
> > >>         at scala.collection.immutable.Map$Map3.foreach(Map.scala:161)
> > >>         at
> > >>
> >
> kafka.controller.KafkaController.kafka$controller$KafkaController$$checkAndTriggerAutoLeaderRebalance(KafkaController.scala:1106)
> > >>         at
> > >>
> >
> kafka.controller.KafkaController$AutoPreferredReplicaLeaderElection$.process(KafkaController.scala:1380)
> > >>         at
> > >>
> >
> kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply$mcV$sp(ControllerEventManager.scala:50)
> > >>         at
> > >>
> >
> kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:50)
> > >>         at
> > >>
> >
> kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:50)
> > >>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
> > >>         at
> > >>
> >
> kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:49)
> > >>         at
> > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
> > >> Caused by: kafka.common.StateChangeFailedException: Preferred replica
> > 182
> > >> for partition [__consumer_offsets,2] is either not alive or not in the
> > isr.
> > >> Current leader and ISR:
> > [{"leader":180,"leader_epoch":2,"isr":[181,180]}]
> > >>         at
> > >>
> >
> kafka.controller.PreferredReplicaPartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:157)
> > >>         at
> > >>
> >
> kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:308)
> > >>         ... 24 more
> > >> 2018-08-20 09:01:20,020 WARN kafka.controller.KafkaController:
> > >> [Controller 180]: Partition [__consumer_offsets,2] failed to complete
> > >> preferred replica leader election. Leader is 180
> > >> 2018-08-20 09:01:24,396 INFO kafka.server.ReplicaFetcherThread:
> > >> [ReplicaFetcherThread-0-182]: Stopped
> > >> 2018-08-20 09:01:24,397 INFO kafka.server.ReplicaFetcherThread:
> > >> [ReplicaFetcherThread-0-182]: Shutdown completed
> > >
> > >
> > >
> > > Our Kafka is part of the Cloudera Distribution -  *CDH 5.13.1*, the
> > > version of kafka running is Kafka version : *0.11.0-kafka-3.0.0*, also
> it
> > > maybe important to note that we are manually committing the offsets
> after
> > > processing each batch of data within the spark job.Few important kafka
> > > properties are as follows:
> > >
> > >>
> > >> *group.id <http://group.id/>=spark-job-consumer-group*
> > >> *auto.offset.reset=latest**enable.auto.commit=false *
> > >
> > >
> > > We still don't have any root cause for the issue and its worrisome as
> > this
> > > behaviour on production is not really acceptable. Any help or pointers
> > > would be really appreciated.
> > > Also please let me know if you need any more information from my
> > >
> > > Thanks & Regards
> > > Biplob Biswas
> > >
> >
>

Reply via email to