What is the value set for offsets.retention.minutes? Default value is `1day` in Kafka 0.11 release. Based on your requirement, you may need to increase this value.
On Mon, Aug 27, 2018 at 1:33 PM Biplob Biswas <revolutioni...@gmail.com> wrote: > Hi, > > Can anyone provide any insights to the issue we faced? > > Thanks & Regards > Biplob Biswas > > > On Fri, Aug 24, 2018 at 4:50 PM Biplob Biswas <revolutioni...@gmail.com> > wrote: > > > Hi everyone, > > > > *Short Version:* > > We had an unusual situation where after restart of our spark job, rather > > than reading from the last known offset of the consumer group, it started > > consuming from the latest offset, thus leading to data loss. > > > > *Long Version:* > > We have a spark job which crashed last Friday due to an NPE introduced by > > developer code. Over the weekend the job was not restarted, although on > > monday the bug was fixed and the job was restarted. The consumer-group > was > > *NOT* changed, but when we restarted the job, the spark job started > > reading form the latest offset. > > > > We started investigating the reasons for the same and we verified our > > spark job configuration and it looks good, we tried to reproduce the > issue > > by killing the job and restarting while new events are coming in and it > > reads the buffered data properly. We also verified that the consumer > group > > had a valid lag which was reducing over time. > > > > Next we checked __consumer_offsets topic and we found the something which > > could explain why spark job started reading fro latest. We found the > > following lines which were not compressed by the topic itself > > > > [spark-job-consumer-group,spark-job-reading-topic,1]::NULL > >> [spark-job-consumer-group,spark-job-reading-topic,3]::NULL > >> [spark-job-consumer-group,spark-job-reading-topic,0]::NULL > >> [spark-job-consumer-group,spark-job-reading-topic,2]::NULL > > > > -- > > These following offset commits came after we started ingesting again on > > monday after bug fix: > > > > > [spark-job-consumer-group,spark-job-reading-topic,3]::[OffsetMetadata[606979,NO_METADATA],CommitTime > >> 1534778505013,ExpirationTime 1534864905013] > >> > [spark-job-consumer-group,spark-job-reading-topic,1]::[OffsetMetadata[607195,NO_METADATA],CommitTime > >> 1534778505013,ExpirationTime 1534864905013] > >> > [spark-job-consumer-group,spark-job-reading-topic,2]::[OffsetMetadata[607100,NO_METADATA],CommitTime > >> 1534778505013,ExpirationTime 1534864905013] > >> > [spark-job-consumer-group,spark-job-reading-topic,0]::[OffsetMetadata[607330,NO_METADATA],CommitTime > >> 1534778505013,ExpirationTime 1534864905013] > >> > [spark-job-consumer-group,spark-job-reading-topic,3]::[OffsetMetadata[606979,NO_METADATA],CommitTime > >> 1534778508010,ExpirationTime 1534864908010] > >> > [spark-job-consumer-group,spark-job-reading-topic,1]::[OffsetMetadata[607195,NO_METADATA],CommitTime > >> 1534778508010,ExpirationTime 1534864908010] > >> > [spark-job-consumer-group,spark-job-reading-topic,2]::[OffsetMetadata[607100,NO_METADATA],CommitTime > >> 1534778508010,ExpirationTime 1534864908010] > >> > [spark-job-consumer-group,spark-job-reading-topic,0]::[OffsetMetadata[607330,NO_METADATA],CommitTime > >> 1534778508010,ExpirationTime 1534864908010] > >> > [spark-job-consumer-group,spark-job-reading-topic,3]::[OffsetMetadata[606979,NO_METADATA],CommitTime > >> 1534778511010,ExpirationTime 1534864911010] > >> > [spark-job-consumer-group,spark-job-reading-topic,1]::[OffsetMetadata[607195,NO_METADATA],CommitTime > >> 1534778511010,ExpirationTime 1534864911010] > >> > [spark-job-consumer-group,spark-job-reading-topic,2]::[OffsetMetadata[607100,NO_METADATA],CommitTime > >> 1534778511010,ExpirationTime 1534864911010] > > > > > > > > But this only showed the reason of the behaviour not the cause, so we > > looked into kafka logs - during the time from when our kafka job died > and > > when we restarted, we found a few exception although these doesn't look > > like that it could be the root cause but I still wanted to make sure: > > > >> > >> 2018-08-20 09:01:20,016 INFO kafka.controller.KafkaController: > >> [Controller 180]: Starting preferred replica leader election for > partitions > >> [__consumer_offsets,2] > >> 2018-08-20 09:01:20,016 INFO kafka.controller.PartitionStateMachine: > >> [Partition state machine on Controller 180]: Invoking state change to > >> OnlinePartition for partitions [__consumer_offsets,2] > >> 2018-08-20 09:01:20,019 INFO > >> kafka.controller.PreferredReplicaPartitionLeaderSelector: > >> [PreferredReplicaPartitionLeaderSelector]: Current leader 180 for > partition > >> [__consumer_offsets,2] is not the preferred replica. Triggering > preferred > >> replica leader election > >> 2018-08-20 09:01:20,019 ERROR state.change.logger: Controller 180 epoch > >> 37 encountered error while electing leader for partition > >> [__consumer_offsets,2] due to: Preferred replica 182 for partition > >> [__consumer_offsets,2] is either not alive or not in the isr. Current > >> leader and ISR: [{"leader":180,"leader_epoch":2,"isr":[181,180]}]. > >> 2018-08-20 09:01:20,019 ERROR state.change.logger: Controller 180 epoch > >> 37 initiated state change for partition [__consumer_offsets,2] from > >> OnlinePartition to OnlinePartition failed > >> kafka.common.StateChangeFailedException: encountered error while > electing > >> leader for partition [__consumer_offsets,2] due to: Preferred replica > 182 > >> for partition [__consumer_offsets,2] is either not alive or not in the > isr. > >> Current leader and ISR: > [{"leader":180,"leader_epoch":2,"isr":[181,180]}]. > >> at > >> > kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:330) > >> at > >> > kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:166) > >> at > >> > kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:111) > >> at > >> > kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:110) > >> at scala.collection.immutable.Set$Set1.foreach(Set.scala:94) > >> at > >> > kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:110) > >> at > >> > kafka.controller.KafkaController.onPreferredReplicaElection(KafkaController.scala:581) > >> at > >> > kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerAutoLeaderRebalance$3$$anonfun$apply$14.apply(KafkaController.scala:1126) > >> at > >> > kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerAutoLeaderRebalance$3$$anonfun$apply$14.apply(KafkaController.scala:1119) > >> at > >> > scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134) > >> at > >> > scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134) > >> at > >> > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) > >> at > scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) > >> at > >> scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:134) > >> at > >> > kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerAutoLeaderRebalance$3.apply(KafkaController.scala:1119) > >> at > >> > kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerAutoLeaderRebalance$3.apply(KafkaController.scala:1106) > >> at scala.collection.immutable.Map$Map3.foreach(Map.scala:161) > >> at > >> > kafka.controller.KafkaController.kafka$controller$KafkaController$$checkAndTriggerAutoLeaderRebalance(KafkaController.scala:1106) > >> at > >> > kafka.controller.KafkaController$AutoPreferredReplicaLeaderElection$.process(KafkaController.scala:1380) > >> at > >> > kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply$mcV$sp(ControllerEventManager.scala:50) > >> at > >> > kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:50) > >> at > >> > kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:50) > >> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31) > >> at > >> > kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:49) > >> at > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64) > >> Caused by: kafka.common.StateChangeFailedException: Preferred replica > 182 > >> for partition [__consumer_offsets,2] is either not alive or not in the > isr. > >> Current leader and ISR: > [{"leader":180,"leader_epoch":2,"isr":[181,180]}] > >> at > >> > kafka.controller.PreferredReplicaPartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:157) > >> at > >> > kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:308) > >> ... 24 more > >> 2018-08-20 09:01:20,020 WARN kafka.controller.KafkaController: > >> [Controller 180]: Partition [__consumer_offsets,2] failed to complete > >> preferred replica leader election. Leader is 180 > >> 2018-08-20 09:01:24,396 INFO kafka.server.ReplicaFetcherThread: > >> [ReplicaFetcherThread-0-182]: Stopped > >> 2018-08-20 09:01:24,397 INFO kafka.server.ReplicaFetcherThread: > >> [ReplicaFetcherThread-0-182]: Shutdown completed > > > > > > > > Our Kafka is part of the Cloudera Distribution - *CDH 5.13.1*, the > > version of kafka running is Kafka version : *0.11.0-kafka-3.0.0*, also it > > maybe important to note that we are manually committing the offsets after > > processing each batch of data within the spark job.Few important kafka > > properties are as follows: > > > >> > >> *group.id <http://group.id/>=spark-job-consumer-group* > >> *auto.offset.reset=latest**enable.auto.commit=false * > > > > > > We still don't have any root cause for the issue and its worrisome as > this > > behaviour on production is not really acceptable. Any help or pointers > > would be really appreciated. > > Also please let me know if you need any more information from my > > > > Thanks & Regards > > Biplob Biswas > > >