What is the value set for offsets.retention.minutes?  Default value is
`1day` in Kafka 0.11 release.
Based on your requirement, you may need to increase this value.

On Mon, Aug 27, 2018 at 1:33 PM Biplob Biswas <revolutioni...@gmail.com>
wrote:

> Hi,
>
> Can anyone provide any insights to the issue we faced?
>
> Thanks & Regards
> Biplob Biswas
>
>
> On Fri, Aug 24, 2018 at 4:50 PM Biplob Biswas <revolutioni...@gmail.com>
> wrote:
>
> > Hi everyone,
> >
> > *Short Version:*
> > We had an unusual situation where after restart of our spark job, rather
> > than reading from the last known offset of the consumer group, it started
> > consuming from the latest offset, thus leading to data loss.
> >
> > *Long Version:*
> > We have a spark job which crashed last Friday due to an NPE introduced by
> > developer code. Over the weekend the job was not restarted, although on
> > monday the bug was fixed and the job was restarted. The consumer-group
> was
> > *NOT* changed, but when we restarted the job, the spark job started
> > reading form the latest offset.
> >
> > We started investigating the reasons for the same and we verified our
> > spark job configuration and it looks good, we tried to reproduce the
> issue
> > by killing the job and restarting while new events are coming in and it
> > reads the buffered data properly. We also verified that the consumer
> group
> > had a valid lag which was reducing over time.
> >
> > Next we checked __consumer_offsets topic and we found the something which
> > could explain why spark job started reading fro latest. We found the
> > following lines which were not compressed by the topic itself
> >
> > [spark-job-consumer-group,spark-job-reading-topic,1]::NULL
> >> [spark-job-consumer-group,spark-job-reading-topic,3]::NULL
> >> [spark-job-consumer-group,spark-job-reading-topic,0]::NULL
> >> [spark-job-consumer-group,spark-job-reading-topic,2]::NULL
> >
> > --
> > These following offset commits came after we started ingesting again on
> > monday after bug fix:
> >
> >
> [spark-job-consumer-group,spark-job-reading-topic,3]::[OffsetMetadata[606979,NO_METADATA],CommitTime
> >> 1534778505013,ExpirationTime 1534864905013]
> >>
> [spark-job-consumer-group,spark-job-reading-topic,1]::[OffsetMetadata[607195,NO_METADATA],CommitTime
> >> 1534778505013,ExpirationTime 1534864905013]
> >>
> [spark-job-consumer-group,spark-job-reading-topic,2]::[OffsetMetadata[607100,NO_METADATA],CommitTime
> >> 1534778505013,ExpirationTime 1534864905013]
> >>
> [spark-job-consumer-group,spark-job-reading-topic,0]::[OffsetMetadata[607330,NO_METADATA],CommitTime
> >> 1534778505013,ExpirationTime 1534864905013]
> >>
> [spark-job-consumer-group,spark-job-reading-topic,3]::[OffsetMetadata[606979,NO_METADATA],CommitTime
> >> 1534778508010,ExpirationTime 1534864908010]
> >>
> [spark-job-consumer-group,spark-job-reading-topic,1]::[OffsetMetadata[607195,NO_METADATA],CommitTime
> >> 1534778508010,ExpirationTime 1534864908010]
> >>
> [spark-job-consumer-group,spark-job-reading-topic,2]::[OffsetMetadata[607100,NO_METADATA],CommitTime
> >> 1534778508010,ExpirationTime 1534864908010]
> >>
> [spark-job-consumer-group,spark-job-reading-topic,0]::[OffsetMetadata[607330,NO_METADATA],CommitTime
> >> 1534778508010,ExpirationTime 1534864908010]
> >>
> [spark-job-consumer-group,spark-job-reading-topic,3]::[OffsetMetadata[606979,NO_METADATA],CommitTime
> >> 1534778511010,ExpirationTime 1534864911010]
> >>
> [spark-job-consumer-group,spark-job-reading-topic,1]::[OffsetMetadata[607195,NO_METADATA],CommitTime
> >> 1534778511010,ExpirationTime 1534864911010]
> >>
> [spark-job-consumer-group,spark-job-reading-topic,2]::[OffsetMetadata[607100,NO_METADATA],CommitTime
> >> 1534778511010,ExpirationTime 1534864911010]
> >
> >
> >
> > But this only showed the reason of the behaviour not the cause, so we
> > looked into kafka logs -  during the time from when our kafka job died
> and
> > when we restarted, we found a few exception although these doesn't look
> > like that it could be the root cause but I still wanted to make sure:
> >
> >>
> >> 2018-08-20 09:01:20,016 INFO kafka.controller.KafkaController:
> >> [Controller 180]: Starting preferred replica leader election for
> partitions
> >> [__consumer_offsets,2]
> >> 2018-08-20 09:01:20,016 INFO kafka.controller.PartitionStateMachine:
> >> [Partition state machine on Controller 180]: Invoking state change to
> >> OnlinePartition for partitions [__consumer_offsets,2]
> >> 2018-08-20 09:01:20,019 INFO
> >> kafka.controller.PreferredReplicaPartitionLeaderSelector:
> >> [PreferredReplicaPartitionLeaderSelector]: Current leader 180 for
> partition
> >> [__consumer_offsets,2] is not the preferred replica. Triggering
> preferred
> >> replica leader election
> >> 2018-08-20 09:01:20,019 ERROR state.change.logger: Controller 180 epoch
> >> 37 encountered error while electing leader for partition
> >> [__consumer_offsets,2] due to: Preferred replica 182 for partition
> >> [__consumer_offsets,2] is either not alive or not in the isr. Current
> >> leader and ISR: [{"leader":180,"leader_epoch":2,"isr":[181,180]}].
> >> 2018-08-20 09:01:20,019 ERROR state.change.logger: Controller 180 epoch
> >> 37 initiated state change for partition [__consumer_offsets,2] from
> >> OnlinePartition to OnlinePartition failed
> >> kafka.common.StateChangeFailedException: encountered error while
> electing
> >> leader for partition [__consumer_offsets,2] due to: Preferred replica
> 182
> >> for partition [__consumer_offsets,2] is either not alive or not in the
> isr.
> >> Current leader and ISR:
> [{"leader":180,"leader_epoch":2,"isr":[181,180]}].
> >>         at
> >>
> kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:330)
> >>         at
> >>
> kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:166)
> >>         at
> >>
> kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:111)
> >>         at
> >>
> kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:110)
> >>         at scala.collection.immutable.Set$Set1.foreach(Set.scala:94)
> >>         at
> >>
> kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:110)
> >>         at
> >>
> kafka.controller.KafkaController.onPreferredReplicaElection(KafkaController.scala:581)
> >>         at
> >>
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerAutoLeaderRebalance$3$$anonfun$apply$14.apply(KafkaController.scala:1126)
> >>         at
> >>
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerAutoLeaderRebalance$3$$anonfun$apply$14.apply(KafkaController.scala:1119)
> >>         at
> >>
> scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134)
> >>         at
> >>
> scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134)
> >>         at
> >>
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
> >>         at
> scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
> >>         at
> >> scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:134)
> >>         at
> >>
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerAutoLeaderRebalance$3.apply(KafkaController.scala:1119)
> >>         at
> >>
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerAutoLeaderRebalance$3.apply(KafkaController.scala:1106)
> >>         at scala.collection.immutable.Map$Map3.foreach(Map.scala:161)
> >>         at
> >>
> kafka.controller.KafkaController.kafka$controller$KafkaController$$checkAndTriggerAutoLeaderRebalance(KafkaController.scala:1106)
> >>         at
> >>
> kafka.controller.KafkaController$AutoPreferredReplicaLeaderElection$.process(KafkaController.scala:1380)
> >>         at
> >>
> kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply$mcV$sp(ControllerEventManager.scala:50)
> >>         at
> >>
> kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:50)
> >>         at
> >>
> kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:50)
> >>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
> >>         at
> >>
> kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:49)
> >>         at
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
> >> Caused by: kafka.common.StateChangeFailedException: Preferred replica
> 182
> >> for partition [__consumer_offsets,2] is either not alive or not in the
> isr.
> >> Current leader and ISR:
> [{"leader":180,"leader_epoch":2,"isr":[181,180]}]
> >>         at
> >>
> kafka.controller.PreferredReplicaPartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:157)
> >>         at
> >>
> kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:308)
> >>         ... 24 more
> >> 2018-08-20 09:01:20,020 WARN kafka.controller.KafkaController:
> >> [Controller 180]: Partition [__consumer_offsets,2] failed to complete
> >> preferred replica leader election. Leader is 180
> >> 2018-08-20 09:01:24,396 INFO kafka.server.ReplicaFetcherThread:
> >> [ReplicaFetcherThread-0-182]: Stopped
> >> 2018-08-20 09:01:24,397 INFO kafka.server.ReplicaFetcherThread:
> >> [ReplicaFetcherThread-0-182]: Shutdown completed
> >
> >
> >
> > Our Kafka is part of the Cloudera Distribution -  *CDH 5.13.1*, the
> > version of kafka running is Kafka version : *0.11.0-kafka-3.0.0*, also it
> > maybe important to note that we are manually committing the offsets after
> > processing each batch of data within the spark job.Few important kafka
> > properties are as follows:
> >
> >>
> >> *group.id <http://group.id/>=spark-job-consumer-group*
> >> *auto.offset.reset=latest**enable.auto.commit=false *
> >
> >
> > We still don't have any root cause for the issue and its worrisome as
> this
> > behaviour on production is not really acceptable. Any help or pointers
> > would be really appreciated.
> > Also please let me know if you need any more information from my
> >
> > Thanks & Regards
> > Biplob Biswas
> >
>

Reply via email to