Hi everyone,

*Short Version:*
We had an unusual situation where after restart of our spark job, rather
than reading from the last known offset of the consumer group, it started
consuming from the latest offset, thus leading to data loss.

*Long Version:*
We have a spark job which crashed last Friday due to an NPE introduced by
developer code. Over the weekend the job was not restarted, although on
monday the bug was fixed and the job was restarted. The consumer-group was
*NOT* changed, but when we restarted the job, the spark job started reading
form the latest offset.

We started investigating the reasons for the same and we verified our spark
job configuration and it looks good, we tried to reproduce the issue by
killing the job and restarting while new events are coming in and it reads
the buffered data properly. We also verified that the consumer group had a
valid lag which was reducing over time.

Next we checked __consumer_offsets topic and we found the something which
could explain why spark job started reading fro latest. We found the
following lines which were not compressed by the topic itself

[spark-job-consumer-group,spark-job-reading-topic,1]::NULL
> [spark-job-consumer-group,spark-job-reading-topic,3]::NULL
> [spark-job-consumer-group,spark-job-reading-topic,0]::NULL
> [spark-job-consumer-group,spark-job-reading-topic,2]::NULL

--
These following offset commits came after we started ingesting again on
monday after bug fix:

[spark-job-consumer-group,spark-job-reading-topic,3]::[OffsetMetadata[606979,NO_METADATA],CommitTime
> 1534778505013,ExpirationTime 1534864905013]
> [spark-job-consumer-group,spark-job-reading-topic,1]::[OffsetMetadata[607195,NO_METADATA],CommitTime
> 1534778505013,ExpirationTime 1534864905013]
> [spark-job-consumer-group,spark-job-reading-topic,2]::[OffsetMetadata[607100,NO_METADATA],CommitTime
> 1534778505013,ExpirationTime 1534864905013]
> [spark-job-consumer-group,spark-job-reading-topic,0]::[OffsetMetadata[607330,NO_METADATA],CommitTime
> 1534778505013,ExpirationTime 1534864905013]
> [spark-job-consumer-group,spark-job-reading-topic,3]::[OffsetMetadata[606979,NO_METADATA],CommitTime
> 1534778508010,ExpirationTime 1534864908010]
> [spark-job-consumer-group,spark-job-reading-topic,1]::[OffsetMetadata[607195,NO_METADATA],CommitTime
> 1534778508010,ExpirationTime 1534864908010]
> [spark-job-consumer-group,spark-job-reading-topic,2]::[OffsetMetadata[607100,NO_METADATA],CommitTime
> 1534778508010,ExpirationTime 1534864908010]
> [spark-job-consumer-group,spark-job-reading-topic,0]::[OffsetMetadata[607330,NO_METADATA],CommitTime
> 1534778508010,ExpirationTime 1534864908010]
> [spark-job-consumer-group,spark-job-reading-topic,3]::[OffsetMetadata[606979,NO_METADATA],CommitTime
> 1534778511010,ExpirationTime 1534864911010]
> [spark-job-consumer-group,spark-job-reading-topic,1]::[OffsetMetadata[607195,NO_METADATA],CommitTime
> 1534778511010,ExpirationTime 1534864911010]
> [spark-job-consumer-group,spark-job-reading-topic,2]::[OffsetMetadata[607100,NO_METADATA],CommitTime
> 1534778511010,ExpirationTime 1534864911010]



But this only showed the reason of the behaviour not the cause, so we
looked into kafka logs -  during the time from when our kafka job died and
when we restarted, we found a few exception although these doesn't look
like that it could be the root cause but I still wanted to make sure:

>
> 2018-08-20 09:01:20,016 INFO kafka.controller.KafkaController: [Controller
> 180]: Starting preferred replica leader election for partitions
> [__consumer_offsets,2]
> 2018-08-20 09:01:20,016 INFO kafka.controller.PartitionStateMachine:
> [Partition state machine on Controller 180]: Invoking state change to
> OnlinePartition for partitions [__consumer_offsets,2]
> 2018-08-20 09:01:20,019 INFO
> kafka.controller.PreferredReplicaPartitionLeaderSelector:
> [PreferredReplicaPartitionLeaderSelector]: Current leader 180 for partition
> [__consumer_offsets,2] is not the preferred replica. Triggering preferred
> replica leader election
> 2018-08-20 09:01:20,019 ERROR state.change.logger: Controller 180 epoch 37
> encountered error while electing leader for partition
> [__consumer_offsets,2] due to: Preferred replica 182 for partition
> [__consumer_offsets,2] is either not alive or not in the isr. Current
> leader and ISR: [{"leader":180,"leader_epoch":2,"isr":[181,180]}].
> 2018-08-20 09:01:20,019 ERROR state.change.logger: Controller 180 epoch 37
> initiated state change for partition [__consumer_offsets,2] from
> OnlinePartition to OnlinePartition failed
> kafka.common.StateChangeFailedException: encountered error while electing
> leader for partition [__consumer_offsets,2] due to: Preferred replica 182
> for partition [__consumer_offsets,2] is either not alive or not in the isr.
> Current leader and ISR: [{"leader":180,"leader_epoch":2,"isr":[181,180]}].
>         at
> kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:330)
>         at
> kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:166)
>         at
> kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:111)
>         at
> kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:110)
>         at scala.collection.immutable.Set$Set1.foreach(Set.scala:94)
>         at
> kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:110)
>         at
> kafka.controller.KafkaController.onPreferredReplicaElection(KafkaController.scala:581)
>         at
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerAutoLeaderRebalance$3$$anonfun$apply$14.apply(KafkaController.scala:1126)
>         at
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerAutoLeaderRebalance$3$$anonfun$apply$14.apply(KafkaController.scala:1119)
>         at
> scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134)
>         at
> scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134)
>         at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
>         at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
>         at
> scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:134)
>         at
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerAutoLeaderRebalance$3.apply(KafkaController.scala:1119)
>         at
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerAutoLeaderRebalance$3.apply(KafkaController.scala:1106)
>         at scala.collection.immutable.Map$Map3.foreach(Map.scala:161)
>         at
> kafka.controller.KafkaController.kafka$controller$KafkaController$$checkAndTriggerAutoLeaderRebalance(KafkaController.scala:1106)
>         at
> kafka.controller.KafkaController$AutoPreferredReplicaLeaderElection$.process(KafkaController.scala:1380)
>         at
> kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply$mcV$sp(ControllerEventManager.scala:50)
>         at
> kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:50)
>         at
> kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:50)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
>         at
> kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:49)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
> Caused by: kafka.common.StateChangeFailedException: Preferred replica 182
> for partition [__consumer_offsets,2] is either not alive or not in the isr.
> Current leader and ISR: [{"leader":180,"leader_epoch":2,"isr":[181,180]}]
>         at
> kafka.controller.PreferredReplicaPartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:157)
>         at
> kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:308)
>         ... 24 more
> 2018-08-20 09:01:20,020 WARN kafka.controller.KafkaController: [Controller
> 180]: Partition [__consumer_offsets,2] failed to complete preferred replica
> leader election. Leader is 180
> 2018-08-20 09:01:24,396 INFO kafka.server.ReplicaFetcherThread:
> [ReplicaFetcherThread-0-182]: Stopped
> 2018-08-20 09:01:24,397 INFO kafka.server.ReplicaFetcherThread:
> [ReplicaFetcherThread-0-182]: Shutdown completed



Our Kafka is part of the Cloudera Distribution -  *CDH 5.13.1*, the version
of kafka running is Kafka version : *0.11.0-kafka-3.0.0*, also it maybe
important to note that we are manually committing the offsets after
processing each batch of data within the spark job.Few important kafka
properties are as follows:

>
> *group.id <http://group.id/>=spark-job-consumer-group*
> *auto.offset.reset=latest**enable.auto.commit=false *


We still don't have any root cause for the issue and its worrisome as this
behaviour on production is not really acceptable. Any help or pointers
would be really appreciated.
Also please let me know if you need any more information from my

Thanks & Regards
Biplob Biswas

Reply via email to