Hi,

Can anyone provide any insights to the issue we faced?

Thanks & Regards
Biplob Biswas


On Fri, Aug 24, 2018 at 4:50 PM Biplob Biswas <revolutioni...@gmail.com>
wrote:

> Hi everyone,
>
> *Short Version:*
> We had an unusual situation where after restart of our spark job, rather
> than reading from the last known offset of the consumer group, it started
> consuming from the latest offset, thus leading to data loss.
>
> *Long Version:*
> We have a spark job which crashed last Friday due to an NPE introduced by
> developer code. Over the weekend the job was not restarted, although on
> monday the bug was fixed and the job was restarted. The consumer-group was
> *NOT* changed, but when we restarted the job, the spark job started
> reading form the latest offset.
>
> We started investigating the reasons for the same and we verified our
> spark job configuration and it looks good, we tried to reproduce the issue
> by killing the job and restarting while new events are coming in and it
> reads the buffered data properly. We also verified that the consumer group
> had a valid lag which was reducing over time.
>
> Next we checked __consumer_offsets topic and we found the something which
> could explain why spark job started reading fro latest. We found the
> following lines which were not compressed by the topic itself
>
> [spark-job-consumer-group,spark-job-reading-topic,1]::NULL
>> [spark-job-consumer-group,spark-job-reading-topic,3]::NULL
>> [spark-job-consumer-group,spark-job-reading-topic,0]::NULL
>> [spark-job-consumer-group,spark-job-reading-topic,2]::NULL
>
> --
> These following offset commits came after we started ingesting again on
> monday after bug fix:
>
> [spark-job-consumer-group,spark-job-reading-topic,3]::[OffsetMetadata[606979,NO_METADATA],CommitTime
>> 1534778505013,ExpirationTime 1534864905013]
>> [spark-job-consumer-group,spark-job-reading-topic,1]::[OffsetMetadata[607195,NO_METADATA],CommitTime
>> 1534778505013,ExpirationTime 1534864905013]
>> [spark-job-consumer-group,spark-job-reading-topic,2]::[OffsetMetadata[607100,NO_METADATA],CommitTime
>> 1534778505013,ExpirationTime 1534864905013]
>> [spark-job-consumer-group,spark-job-reading-topic,0]::[OffsetMetadata[607330,NO_METADATA],CommitTime
>> 1534778505013,ExpirationTime 1534864905013]
>> [spark-job-consumer-group,spark-job-reading-topic,3]::[OffsetMetadata[606979,NO_METADATA],CommitTime
>> 1534778508010,ExpirationTime 1534864908010]
>> [spark-job-consumer-group,spark-job-reading-topic,1]::[OffsetMetadata[607195,NO_METADATA],CommitTime
>> 1534778508010,ExpirationTime 1534864908010]
>> [spark-job-consumer-group,spark-job-reading-topic,2]::[OffsetMetadata[607100,NO_METADATA],CommitTime
>> 1534778508010,ExpirationTime 1534864908010]
>> [spark-job-consumer-group,spark-job-reading-topic,0]::[OffsetMetadata[607330,NO_METADATA],CommitTime
>> 1534778508010,ExpirationTime 1534864908010]
>> [spark-job-consumer-group,spark-job-reading-topic,3]::[OffsetMetadata[606979,NO_METADATA],CommitTime
>> 1534778511010,ExpirationTime 1534864911010]
>> [spark-job-consumer-group,spark-job-reading-topic,1]::[OffsetMetadata[607195,NO_METADATA],CommitTime
>> 1534778511010,ExpirationTime 1534864911010]
>> [spark-job-consumer-group,spark-job-reading-topic,2]::[OffsetMetadata[607100,NO_METADATA],CommitTime
>> 1534778511010,ExpirationTime 1534864911010]
>
>
>
> But this only showed the reason of the behaviour not the cause, so we
> looked into kafka logs -  during the time from when our kafka job died and
> when we restarted, we found a few exception although these doesn't look
> like that it could be the root cause but I still wanted to make sure:
>
>>
>> 2018-08-20 09:01:20,016 INFO kafka.controller.KafkaController:
>> [Controller 180]: Starting preferred replica leader election for partitions
>> [__consumer_offsets,2]
>> 2018-08-20 09:01:20,016 INFO kafka.controller.PartitionStateMachine:
>> [Partition state machine on Controller 180]: Invoking state change to
>> OnlinePartition for partitions [__consumer_offsets,2]
>> 2018-08-20 09:01:20,019 INFO
>> kafka.controller.PreferredReplicaPartitionLeaderSelector:
>> [PreferredReplicaPartitionLeaderSelector]: Current leader 180 for partition
>> [__consumer_offsets,2] is not the preferred replica. Triggering preferred
>> replica leader election
>> 2018-08-20 09:01:20,019 ERROR state.change.logger: Controller 180 epoch
>> 37 encountered error while electing leader for partition
>> [__consumer_offsets,2] due to: Preferred replica 182 for partition
>> [__consumer_offsets,2] is either not alive or not in the isr. Current
>> leader and ISR: [{"leader":180,"leader_epoch":2,"isr":[181,180]}].
>> 2018-08-20 09:01:20,019 ERROR state.change.logger: Controller 180 epoch
>> 37 initiated state change for partition [__consumer_offsets,2] from
>> OnlinePartition to OnlinePartition failed
>> kafka.common.StateChangeFailedException: encountered error while electing
>> leader for partition [__consumer_offsets,2] due to: Preferred replica 182
>> for partition [__consumer_offsets,2] is either not alive or not in the isr.
>> Current leader and ISR: [{"leader":180,"leader_epoch":2,"isr":[181,180]}].
>>         at
>> kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:330)
>>         at
>> kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:166)
>>         at
>> kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:111)
>>         at
>> kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:110)
>>         at scala.collection.immutable.Set$Set1.foreach(Set.scala:94)
>>         at
>> kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:110)
>>         at
>> kafka.controller.KafkaController.onPreferredReplicaElection(KafkaController.scala:581)
>>         at
>> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerAutoLeaderRebalance$3$$anonfun$apply$14.apply(KafkaController.scala:1126)
>>         at
>> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerAutoLeaderRebalance$3$$anonfun$apply$14.apply(KafkaController.scala:1119)
>>         at
>> scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134)
>>         at
>> scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134)
>>         at
>> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
>>         at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
>>         at
>> scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:134)
>>         at
>> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerAutoLeaderRebalance$3.apply(KafkaController.scala:1119)
>>         at
>> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerAutoLeaderRebalance$3.apply(KafkaController.scala:1106)
>>         at scala.collection.immutable.Map$Map3.foreach(Map.scala:161)
>>         at
>> kafka.controller.KafkaController.kafka$controller$KafkaController$$checkAndTriggerAutoLeaderRebalance(KafkaController.scala:1106)
>>         at
>> kafka.controller.KafkaController$AutoPreferredReplicaLeaderElection$.process(KafkaController.scala:1380)
>>         at
>> kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply$mcV$sp(ControllerEventManager.scala:50)
>>         at
>> kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:50)
>>         at
>> kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:50)
>>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
>>         at
>> kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:49)
>>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
>> Caused by: kafka.common.StateChangeFailedException: Preferred replica 182
>> for partition [__consumer_offsets,2] is either not alive or not in the isr.
>> Current leader and ISR: [{"leader":180,"leader_epoch":2,"isr":[181,180]}]
>>         at
>> kafka.controller.PreferredReplicaPartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:157)
>>         at
>> kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:308)
>>         ... 24 more
>> 2018-08-20 09:01:20,020 WARN kafka.controller.KafkaController:
>> [Controller 180]: Partition [__consumer_offsets,2] failed to complete
>> preferred replica leader election. Leader is 180
>> 2018-08-20 09:01:24,396 INFO kafka.server.ReplicaFetcherThread:
>> [ReplicaFetcherThread-0-182]: Stopped
>> 2018-08-20 09:01:24,397 INFO kafka.server.ReplicaFetcherThread:
>> [ReplicaFetcherThread-0-182]: Shutdown completed
>
>
>
> Our Kafka is part of the Cloudera Distribution -  *CDH 5.13.1*, the
> version of kafka running is Kafka version : *0.11.0-kafka-3.0.0*, also it
> maybe important to note that we are manually committing the offsets after
> processing each batch of data within the spark job.Few important kafka
> properties are as follows:
>
>>
>> *group.id <http://group.id/>=spark-job-consumer-group*
>> *auto.offset.reset=latest**enable.auto.commit=false *
>
>
> We still don't have any root cause for the issue and its worrisome as this
> behaviour on production is not really acceptable. Any help or pointers
> would be really appreciated.
> Also please let me know if you need any more information from my
>
> Thanks & Regards
> Biplob Biswas
>

Reply via email to