Hi, Can anyone provide any insights to the issue we faced?
Thanks & Regards Biplob Biswas On Fri, Aug 24, 2018 at 4:50 PM Biplob Biswas <revolutioni...@gmail.com> wrote: > Hi everyone, > > *Short Version:* > We had an unusual situation where after restart of our spark job, rather > than reading from the last known offset of the consumer group, it started > consuming from the latest offset, thus leading to data loss. > > *Long Version:* > We have a spark job which crashed last Friday due to an NPE introduced by > developer code. Over the weekend the job was not restarted, although on > monday the bug was fixed and the job was restarted. The consumer-group was > *NOT* changed, but when we restarted the job, the spark job started > reading form the latest offset. > > We started investigating the reasons for the same and we verified our > spark job configuration and it looks good, we tried to reproduce the issue > by killing the job and restarting while new events are coming in and it > reads the buffered data properly. We also verified that the consumer group > had a valid lag which was reducing over time. > > Next we checked __consumer_offsets topic and we found the something which > could explain why spark job started reading fro latest. We found the > following lines which were not compressed by the topic itself > > [spark-job-consumer-group,spark-job-reading-topic,1]::NULL >> [spark-job-consumer-group,spark-job-reading-topic,3]::NULL >> [spark-job-consumer-group,spark-job-reading-topic,0]::NULL >> [spark-job-consumer-group,spark-job-reading-topic,2]::NULL > > -- > These following offset commits came after we started ingesting again on > monday after bug fix: > > [spark-job-consumer-group,spark-job-reading-topic,3]::[OffsetMetadata[606979,NO_METADATA],CommitTime >> 1534778505013,ExpirationTime 1534864905013] >> [spark-job-consumer-group,spark-job-reading-topic,1]::[OffsetMetadata[607195,NO_METADATA],CommitTime >> 1534778505013,ExpirationTime 1534864905013] >> [spark-job-consumer-group,spark-job-reading-topic,2]::[OffsetMetadata[607100,NO_METADATA],CommitTime >> 1534778505013,ExpirationTime 1534864905013] >> [spark-job-consumer-group,spark-job-reading-topic,0]::[OffsetMetadata[607330,NO_METADATA],CommitTime >> 1534778505013,ExpirationTime 1534864905013] >> [spark-job-consumer-group,spark-job-reading-topic,3]::[OffsetMetadata[606979,NO_METADATA],CommitTime >> 1534778508010,ExpirationTime 1534864908010] >> [spark-job-consumer-group,spark-job-reading-topic,1]::[OffsetMetadata[607195,NO_METADATA],CommitTime >> 1534778508010,ExpirationTime 1534864908010] >> [spark-job-consumer-group,spark-job-reading-topic,2]::[OffsetMetadata[607100,NO_METADATA],CommitTime >> 1534778508010,ExpirationTime 1534864908010] >> [spark-job-consumer-group,spark-job-reading-topic,0]::[OffsetMetadata[607330,NO_METADATA],CommitTime >> 1534778508010,ExpirationTime 1534864908010] >> [spark-job-consumer-group,spark-job-reading-topic,3]::[OffsetMetadata[606979,NO_METADATA],CommitTime >> 1534778511010,ExpirationTime 1534864911010] >> [spark-job-consumer-group,spark-job-reading-topic,1]::[OffsetMetadata[607195,NO_METADATA],CommitTime >> 1534778511010,ExpirationTime 1534864911010] >> [spark-job-consumer-group,spark-job-reading-topic,2]::[OffsetMetadata[607100,NO_METADATA],CommitTime >> 1534778511010,ExpirationTime 1534864911010] > > > > But this only showed the reason of the behaviour not the cause, so we > looked into kafka logs - during the time from when our kafka job died and > when we restarted, we found a few exception although these doesn't look > like that it could be the root cause but I still wanted to make sure: > >> >> 2018-08-20 09:01:20,016 INFO kafka.controller.KafkaController: >> [Controller 180]: Starting preferred replica leader election for partitions >> [__consumer_offsets,2] >> 2018-08-20 09:01:20,016 INFO kafka.controller.PartitionStateMachine: >> [Partition state machine on Controller 180]: Invoking state change to >> OnlinePartition for partitions [__consumer_offsets,2] >> 2018-08-20 09:01:20,019 INFO >> kafka.controller.PreferredReplicaPartitionLeaderSelector: >> [PreferredReplicaPartitionLeaderSelector]: Current leader 180 for partition >> [__consumer_offsets,2] is not the preferred replica. Triggering preferred >> replica leader election >> 2018-08-20 09:01:20,019 ERROR state.change.logger: Controller 180 epoch >> 37 encountered error while electing leader for partition >> [__consumer_offsets,2] due to: Preferred replica 182 for partition >> [__consumer_offsets,2] is either not alive or not in the isr. Current >> leader and ISR: [{"leader":180,"leader_epoch":2,"isr":[181,180]}]. >> 2018-08-20 09:01:20,019 ERROR state.change.logger: Controller 180 epoch >> 37 initiated state change for partition [__consumer_offsets,2] from >> OnlinePartition to OnlinePartition failed >> kafka.common.StateChangeFailedException: encountered error while electing >> leader for partition [__consumer_offsets,2] due to: Preferred replica 182 >> for partition [__consumer_offsets,2] is either not alive or not in the isr. >> Current leader and ISR: [{"leader":180,"leader_epoch":2,"isr":[181,180]}]. >> at >> kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:330) >> at >> kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:166) >> at >> kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:111) >> at >> kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:110) >> at scala.collection.immutable.Set$Set1.foreach(Set.scala:94) >> at >> kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:110) >> at >> kafka.controller.KafkaController.onPreferredReplicaElection(KafkaController.scala:581) >> at >> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerAutoLeaderRebalance$3$$anonfun$apply$14.apply(KafkaController.scala:1126) >> at >> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerAutoLeaderRebalance$3$$anonfun$apply$14.apply(KafkaController.scala:1119) >> at >> scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134) >> at >> scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134) >> at >> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) >> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) >> at >> scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:134) >> at >> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerAutoLeaderRebalance$3.apply(KafkaController.scala:1119) >> at >> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerAutoLeaderRebalance$3.apply(KafkaController.scala:1106) >> at scala.collection.immutable.Map$Map3.foreach(Map.scala:161) >> at >> kafka.controller.KafkaController.kafka$controller$KafkaController$$checkAndTriggerAutoLeaderRebalance(KafkaController.scala:1106) >> at >> kafka.controller.KafkaController$AutoPreferredReplicaLeaderElection$.process(KafkaController.scala:1380) >> at >> kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply$mcV$sp(ControllerEventManager.scala:50) >> at >> kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:50) >> at >> kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:50) >> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31) >> at >> kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:49) >> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64) >> Caused by: kafka.common.StateChangeFailedException: Preferred replica 182 >> for partition [__consumer_offsets,2] is either not alive or not in the isr. >> Current leader and ISR: [{"leader":180,"leader_epoch":2,"isr":[181,180]}] >> at >> kafka.controller.PreferredReplicaPartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:157) >> at >> kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:308) >> ... 24 more >> 2018-08-20 09:01:20,020 WARN kafka.controller.KafkaController: >> [Controller 180]: Partition [__consumer_offsets,2] failed to complete >> preferred replica leader election. Leader is 180 >> 2018-08-20 09:01:24,396 INFO kafka.server.ReplicaFetcherThread: >> [ReplicaFetcherThread-0-182]: Stopped >> 2018-08-20 09:01:24,397 INFO kafka.server.ReplicaFetcherThread: >> [ReplicaFetcherThread-0-182]: Shutdown completed > > > > Our Kafka is part of the Cloudera Distribution - *CDH 5.13.1*, the > version of kafka running is Kafka version : *0.11.0-kafka-3.0.0*, also it > maybe important to note that we are manually committing the offsets after > processing each batch of data within the spark job.Few important kafka > properties are as follows: > >> >> *group.id <http://group.id/>=spark-job-consumer-group* >> *auto.offset.reset=latest**enable.auto.commit=false * > > > We still don't have any root cause for the issue and its worrisome as this > behaviour on production is not really acceptable. Any help or pointers > would be really appreciated. > Also please let me know if you need any more information from my > > Thanks & Regards > Biplob Biswas >