[ 
https://issues.apache.org/jira/browse/KAFKA-7447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16959803#comment-16959803
 ] 

Tim Van Laer edited comment on KAFKA-7447 at 10/25/19 2:45 PM:
---------------------------------------------------------------

During offset loading in the recovering broker, following exception was thrown, 
I don't know if that can cause the issue.
{code:java}
[2019-10-25 05:09:08,151] INFO [GroupMetadataManager brokerId=2] Scheduling 
loading of offsets and group metadata from __consumer_offsets-0 
(kafka.coordinator.group.GroupMetadataManager) 
... 
[2019-10-25 05:09:12,972] ERROR [GroupMetadataManager brokerId=2] Error loading 
offsets from __consumer_offsets-0 
(kafka.coordinator.group.GroupMetadataManager) 
java.util.NoSuchElementException: key not found: 
redacted-kafkastreams-application-4cbd9546-7b7b-4436-942f-26abae8aeb19-StreamThread-1-consumer-d994fd76-b31e-44ad-a27a-6f43ff0e9e78
 at scala.collection.MapLike.default(MapLike.scala:235) at 
scala.collection.MapLike.default$(MapLike.scala:234) at 
scala.collection.AbstractMap.default(Map.scala:63) at 
scala.collection.mutable.HashMap.apply(HashMap.scala:69) at 
kafka.coordinator.group.GroupMetadata.get(GroupMetadata.scala:203) at 
kafka.coordinator.group.GroupCoordinator.$anonfun$tryCompleteHeartbeat$1(GroupCoordinator.scala:927)
 at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) at 
kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at 
kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:198) at 
kafka.coordinator.group.GroupCoordinator.tryCompleteHeartbeat(GroupCoordinator.scala:920)
 at 
kafka.coordinator.group.DelayedHeartbeat.tryComplete(DelayedHeartbeat.scala:34) 
at kafka.server.DelayedOperation.maybeTryComplete(DelayedOperation.scala:121) 
at 
kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:388)
 at 
kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:294)
 at 
kafka.coordinator.group.GroupCoordinator.completeAndScheduleNextExpiration(GroupCoordinator.scala:737)
 at 
kafka.coordinator.group.GroupCoordinator.completeAndScheduleNextHeartbeatExpiration(GroupCoordinator.scala:730)
 at 
kafka.coordinator.group.GroupCoordinator.$anonfun$onGroupLoaded$3(GroupCoordinator.scala:677)
 at 
kafka.coordinator.group.GroupCoordinator.$anonfun$onGroupLoaded$3$adapted(GroupCoordinator.scala:677)
 at scala.collection.immutable.List.foreach(List.scala:392) at 
kafka.coordinator.group.GroupCoordinator.$anonfun$onGroupLoaded$1(GroupCoordinator.scala:677)
 at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at 
kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at 
kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:198) at 
kafka.coordinator.group.GroupCoordinator.onGroupLoaded(GroupCoordinator.scala:670)
 at 
kafka.coordinator.group.GroupCoordinator.$anonfun$handleGroupImmigration$1(GroupCoordinator.scala:682)
 at 
kafka.coordinator.group.GroupCoordinator.$anonfun$handleGroupImmigration$1$adapted(GroupCoordinator.scala:682)
 at 
kafka.coordinator.group.GroupMetadataManager.$anonfun$doLoadGroupsAndOffsets$23(GroupMetadataManager.scala:646)
 at 
kafka.coordinator.group.GroupMetadataManager.$anonfun$doLoadGroupsAndOffsets$23$adapted(GroupMetadataManager.scala:641)
 at 
scala.collection.mutable.HashMap$$anon$2.$anonfun$foreach$3(HashMap.scala:158) 
at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237) at 
scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230) at 
scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44) at 
scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:158) at 
kafka.coordinator.group.GroupMetadataManager.doLoadGroupsAndOffsets(GroupMetadataManager.scala:641)
 at 
kafka.coordinator.group.GroupMetadataManager.loadGroupsAndOffsets(GroupMetadataManager.scala:500)
 at 
kafka.coordinator.group.GroupMetadataManager.$anonfun$scheduleLoadGroupAndOffsets$2(GroupMetadataManager.scala:491)
 at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114) at 
kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63) at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748)
{code}

This very much looks like KAFKA-8896, which is fixed in 2.2.2. I'll give that a 
try first.


was (Author: timvanlaer):
During offset loading in the recovering broker, following exception was thrown, 
I don't know if that can cause the issue.
{code:java}
[2019-10-25 05:09:08,151] INFO [GroupMetadataManager brokerId=2] Scheduling 
loading of offsets and group metadata from __consumer_offsets-0 
(kafka.coordinator.group.GroupMetadataManager) 
... 
[2019-10-25 05:09:12,972] ERROR [GroupMetadataManager brokerId=2] Error loading 
offsets from __consumer_offsets-0 
(kafka.coordinator.group.GroupMetadataManager) 
java.util.NoSuchElementException: key not found: 
redacted-kafkastreams-application-4cbd9546-7b7b-4436-942f-26abae8aeb19-StreamThread-1-consumer-d994fd76-b31e-44ad-a27a-6f43ff0e9e78
 at scala.collection.MapLike.default(MapLike.scala:235) at 
scala.collection.MapLike.default$(MapLike.scala:234) at 
scala.collection.AbstractMap.default(Map.scala:63) at 
scala.collection.mutable.HashMap.apply(HashMap.scala:69) at 
kafka.coordinator.group.GroupMetadata.get(GroupMetadata.scala:203) at 
kafka.coordinator.group.GroupCoordinator.$anonfun$tryCompleteHeartbeat$1(GroupCoordinator.scala:927)
 at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) at 
kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at 
kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:198) at 
kafka.coordinator.group.GroupCoordinator.tryCompleteHeartbeat(GroupCoordinator.scala:920)
 at 
kafka.coordinator.group.DelayedHeartbeat.tryComplete(DelayedHeartbeat.scala:34) 
at kafka.server.DelayedOperation.maybeTryComplete(DelayedOperation.scala:121) 
at 
kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:388)
 at 
kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:294)
 at 
kafka.coordinator.group.GroupCoordinator.completeAndScheduleNextExpiration(GroupCoordinator.scala:737)
 at 
kafka.coordinator.group.GroupCoordinator.completeAndScheduleNextHeartbeatExpiration(GroupCoordinator.scala:730)
 at 
kafka.coordinator.group.GroupCoordinator.$anonfun$onGroupLoaded$3(GroupCoordinator.scala:677)
 at 
kafka.coordinator.group.GroupCoordinator.$anonfun$onGroupLoaded$3$adapted(GroupCoordinator.scala:677)
 at scala.collection.immutable.List.foreach(List.scala:392) at 
kafka.coordinator.group.GroupCoordinator.$anonfun$onGroupLoaded$1(GroupCoordinator.scala:677)
 at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at 
kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at 
kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:198) at 
kafka.coordinator.group.GroupCoordinator.onGroupLoaded(GroupCoordinator.scala:670)
 at 
kafka.coordinator.group.GroupCoordinator.$anonfun$handleGroupImmigration$1(GroupCoordinator.scala:682)
 at 
kafka.coordinator.group.GroupCoordinator.$anonfun$handleGroupImmigration$1$adapted(GroupCoordinator.scala:682)
 at 
kafka.coordinator.group.GroupMetadataManager.$anonfun$doLoadGroupsAndOffsets$23(GroupMetadataManager.scala:646)
 at 
kafka.coordinator.group.GroupMetadataManager.$anonfun$doLoadGroupsAndOffsets$23$adapted(GroupMetadataManager.scala:641)
 at 
scala.collection.mutable.HashMap$$anon$2.$anonfun$foreach$3(HashMap.scala:158) 
at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237) at 
scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230) at 
scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44) at 
scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:158) at 
kafka.coordinator.group.GroupMetadataManager.doLoadGroupsAndOffsets(GroupMetadataManager.scala:641)
 at 
kafka.coordinator.group.GroupMetadataManager.loadGroupsAndOffsets(GroupMetadataManager.scala:500)
 at 
kafka.coordinator.group.GroupMetadataManager.$anonfun$scheduleLoadGroupAndOffsets$2(GroupMetadataManager.scala:491)
 at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114) at 
kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63) at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748)

{code}

> Consumer offsets lost during leadership rebalance after bringing node back 
> from clean shutdown
> ----------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-7447
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7447
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 1.1.1, 2.0.0
>            Reporter: Ben Isaacs
>            Priority: Major
>
> *Summary:*
>  * When 1 of my 3 brokers is cleanly shut down, consumption and production 
> continues as normal due to replication. (Consumers are rebalanced to the 
> replicas, and producers are rebalanced to the remaining brokers). However, 
> when the cleanly-shut-down broker comes back, after about 10 minutes, a 
> flurry of production errors occur and my consumers suddenly go back in time 2 
> weeks, causing a long outage (12 hours+) as all messages are replayed on some 
> topics.
>  * The hypothesis is that the auto-leadership-rebalance is happening too 
> quickly after the downed broker returns, before it has had a chance to become 
> fully synchronised on all partitions. In particular, it seems that having 
> consumer offets ahead of the most recent data on the topic that consumer was 
> following causes the consumer to be reset to 0.
> *Expected:*
>  * bringing a node back from a clean shut down does not cause any consumers 
> to reset to 0.
> *Actual:*
>  * I experience approximately 12 hours of partial outage triggered at the 
> point that auto leadership rebalance occurs, after a cleanly shut down node 
> returns.
> *Workaround:*
>  * disable auto leadership rebalance entirely. 
>  * manually rebalance it from time to time when all nodes and all partitions 
> are fully replicated.
> *My Setup:*
>  * Kafka deployment with 3 brokers and 2 topics.
>  * Replication factor is 3, for all topics.
>  * min.isr is 2, for all topics.
>  * Zookeeper deployment with 3 instances.
>  * In the region of 10 to 15 consumers, with 2 user topics (and, of course, 
> the system topics such as consumer offsets). Consumer offsets has the 
> standard 50 partitions. The user topics have about 3000 partitions in total.
>  * Offset retention time of 7 days, and topic retention time of 14 days.
>  * Input rate ~1000 messages/sec.
>  * Deployment happens to be on Google compute engine.
> *Related Stack Overflow Post:*
> https://stackoverflow.com/questions/52367825/apache-kafka-loses-some-consumer-offsets-when-when-i-bounce-a-broker
> It was suggested I open a ticket by "Muir" who says he they have also 
> experienced this.
> *Transcription of logs, showing the problem:*
> Below, you can see chronologically sorted, interleaved, logs from the 3 
> brokers. prod-kafka-2 is the node which was cleanly shut down and then 
> restarted. I filtered the messages only to those regardling 
> __consumer_offsets-29 because it's just too much to paste, otherwise.
> ||Broker host||Broker ID||
> |prod-kafka-1|0|
> |prod-kafka-2|1 (this one was restarted)|
> |prod-kafka-3|2|
> prod-kafka-2: (just starting up)
> {code}
> [2018-09-17 09:21:46,246] WARN [ReplicaFetcher replicaId=1, leaderId=2, 
> fetcherId=0] Based on follower's leader epoch, leader replied with an unknown 
> offset in __consumer_offsets-29. The initial fetch offset 0 will be used for 
> truncation. (kafka.server.ReplicaFetcherThread)
> {code}
> prod-kafka-3: (sees replica1 come back)
> {code}
> [2018-09-17 09:22:02,027] INFO [Partition __consumer_offsets-29 broker=2] 
> Expanding ISR from 0,2 to 0,2,1 (kafka.cluster.Partition)
> {code}
> prod-kafka-2:
> {code}
> [2018-09-17 09:22:33,892] INFO [GroupMetadataManager brokerId=1] Scheduling 
> unloading of offsets and group metadata from __consumer_offsets-29 
> (kafka.coordinator.group.GroupMetadataManager)
>  [2018-09-17 09:22:33,902] INFO [GroupMetadataManager brokerId=1] Finished 
> unloading __consumer_offsets-29. Removed 0 cached offsets and 0 cached 
> groups. (kafka.coordinator.group.GroupMetadataManager)
>  [2018-09-17 09:24:03,287] INFO [ReplicaFetcherManager on broker 1] Removed 
> fetcher for partitions __consumer_offsets-29 
> (kafka.server.ReplicaFetcherManager)
>  [2018-09-17 09:24:03,287] INFO [Partition __consumer_offsets-29 broker=1] 
> __consumer_offsets-29 starts at Leader Epoch 78 from offset 0. Previous 
> Leader Epoch was: 77 (kafka.cluster.Partition)
>  [2018-09-17 09:24:03,287] INFO [GroupMetadataManager brokerId=1] Scheduling 
> loading of offsets and group metadata from __consumer_offsets-29 
> (kafka.coordinator.group.GroupMetadataManager)
>  [2018-09-17 09:24:03,288] INFO [GroupMetadataManager brokerId=1] Finished 
> loading offsets and group metadata from __consumer_offsets-29 in 0 
> milliseconds. (kafka.coordinator.group.GroupMetadataManager)
> {code}
> prod-kafka-3: struggling to agree with prod-kafka-2. Kicks it out of ISR, but 
> then fights with ZooKeeper. Perhaps 2 and 3 both think they're leader?
> {code}
> [2018-09-17 09:24:15,372] INFO [Partition __consumer_offsets-29 broker=2] 
> Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
>  [2018-09-17 09:24:15,377] INFO [Partition __consumer_offsets-29 broker=2] 
> Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR 
> (kafka.cluster.Partition)
> {code}
> prod-kafka-2: rudely kicks BOTH of the other two replicas out of the ISR 
> list, even though 2 is the one we just restarted and therefore is most likely 
> behind. (Bear in mind that it already decided to truncate the topic to 0!)
> {code}
> [2018-09-17 09:24:16,481] INFO [Partition __consumer_offsets-29 broker=1] 
> Shrinking ISR from 0,2,1 to 1 (kafka.cluster.Partition)
> {code}
> prod-kafka-3: still fighting with zookeeper. Eventually loses.
> {code}
> [2018-09-17 09:24:20,374] INFO [Partition __consumer_offsets-29 broker=2] 
> Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
>  [2018-09-17 09:24:20,378] INFO [Partition __consumer_offsets-29 broker=2] 
> Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR 
> (kafka.cluster.Partition)
>  [2018-09-17 09:24:25,347] INFO [Partition __consumer_offsets-29 broker=2] 
> Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
>  [2018-09-17 09:24:25,350] INFO [Partition __consumer_offsets-29 broker=2] 
> Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR 
> (kafka.cluster.Partition)
>  [2018-09-17 09:24:30,359] INFO [Partition __consumer_offsets-29 broker=2] 
> Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
>  [2018-09-17 09:24:30,362] INFO [Partition __consumer_offsets-29 broker=2] 
> Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR 
> (kafka.cluster.Partition)
>  [2018-09-17 09:24:35,365] INFO [Partition __consumer_offsets-29 broker=2] 
> Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
>  [2018-09-17 09:24:35,368] INFO [Partition __consumer_offsets-29 broker=2] 
> Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR 
> (kafka.cluster.Partition)
>  [2018-09-17 09:24:40,352] INFO [Partition __consumer_offsets-29 broker=2] 
> Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
>  [2018-09-17 09:24:40,354] INFO [Partition __consumer_offsets-29 broker=2] 
> Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR 
> (kafka.cluster.Partition)
>  [2018-09-17 09:24:45,422] INFO [Partition __consumer_offsets-29 broker=2] 
> Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
>  [2018-09-17 09:24:45,425] INFO [Partition __consumer_offsets-29 broker=2] 
> Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR 
> (kafka.cluster.Partition)
>  [2018-09-17 09:24:50,345] INFO [Partition __consumer_offsets-29 broker=2] 
> Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
>  [2018-09-17 09:24:50,348] INFO [Partition __consumer_offsets-29 broker=2] 
> Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR 
> (kafka.cluster.Partition)
>  [2018-09-17 09:24:55,444] INFO [Partition __consumer_offsets-29 broker=2] 
> Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
>  [2018-09-17 09:24:55,449] INFO [Partition __consumer_offsets-29 broker=2] 
> Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR 
> (kafka.cluster.Partition)
>  [2018-09-17 09:25:00,340] INFO [Partition __consumer_offsets-29 broker=2] 
> Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
>  [2018-09-17 09:25:00,343] INFO [Partition __consumer_offsets-29 broker=2] 
> Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR 
> (kafka.cluster.Partition)
>  [2018-09-17 09:25:05,374] INFO [Partition __consumer_offsets-29 broker=2] 
> Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
>  [2018-09-17 09:25:05,377] INFO [Partition __consumer_offsets-29 broker=2] 
> Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR 
> (kafka.cluster.Partition)
>  [2018-09-17 09:25:10,342] INFO [Partition __consumer_offsets-29 broker=2] 
> Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
>  [2018-09-17 09:25:10,344] INFO [Partition __consumer_offsets-29 broker=2] 
> Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR 
> (kafka.cluster.Partition)
>  [2018-09-17 09:25:15,348] INFO [Partition __consumer_offsets-29 broker=2] 
> Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
>  [2018-09-17 09:25:15,351] INFO [Partition __consumer_offsets-29 broker=2] 
> Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR 
> (kafka.cluster.Partition)
>  [2018-09-17 09:25:20,338] INFO [Partition __consumer_offsets-29 broker=2] 
> Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
>  [2018-09-17 09:25:20,340] INFO [Partition __consumer_offsets-29 broker=2] 
> Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR 
> (kafka.cluster.Partition)
>  [2018-09-17 09:25:25,338] INFO [Partition __consumer_offsets-29 broker=2] 
> Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
>  [2018-09-17 09:25:25,340] INFO [Partition __consumer_offsets-29 broker=2] 
> Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR 
> (kafka.cluster.Partition)
>  [2018-09-17 09:25:30,382] INFO [Partition __consumer_offsets-29 broker=2] 
> Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
>  [2018-09-17 09:25:30,387] INFO [Partition __consumer_offsets-29 broker=2] 
> Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR 
> (kafka.cluster.Partition)
>  [2018-09-17 09:25:35,341] INFO [Partition __consumer_offsets-29 broker=2] 
> Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
>  [2018-09-17 09:25:35,344] INFO [Partition __consumer_offsets-29 broker=2] 
> Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR 
> (kafka.cluster.Partition)
>  [2018-09-17 09:25:40,460] INFO [Partition __consumer_offsets-29 broker=2] 
> Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
>  [2018-09-17 09:25:40,465] INFO [Partition __consumer_offsets-29 broker=2] 
> Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR 
> (kafka.cluster.Partition)
>  [2018-09-17 09:25:45,335] INFO [Partition __consumer_offsets-29 broker=2] 
> Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
>  [2018-09-17 09:25:45,338] INFO [Partition __consumer_offsets-29 broker=2] 
> Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR 
> (kafka.cluster.Partition)
> {code}
> prod-kafka-1: suddenly gets confused and also re-inits to 0, as prod-kafka-2 
> apparently becomes leader.
> {code}
> [2018-09-17 09:25:48,807] INFO [ReplicaFetcher replicaId=0, leaderId=2, 
> fetcherId=0] Remote broker is not the leader for partition 
> __consumer_offsets-29, which could indicate that the partition is being moved 
> (kafka.server.ReplicaFetcherThread)
> {code}
> prod-kafka-3: finally decides that prod-kafka-2 is in charge, truncates 
> accordingly
> {code}
> [2018-09-17 09:25:48,806] INFO [ReplicaFetcherManager on broker 2] Removed 
> fetcher for partitions __consumer_offsets-29 
> (kafka.server.ReplicaFetcherManager)
>  [2018-09-17 09:25:48,807] INFO [ReplicaFetcherManager on broker 2] Added 
> fetcher for partitions List([__consumer_offsets-29, initOffset 0 to broker 
> BrokerEndPoint(1,prod-kafka-2.c.i-lastfm-prod.internal,9092)] ) 
> (kafka.server.ReplicaFetcherManager)
>  [2018-09-17 09:25:48,809] INFO [GroupMetadataManager brokerId=2] Scheduling 
> unloading of offsets and group metadata from __consumer_offsets-29 
> (kafka.coordinator.group.GroupMetadataManager)
>  [2018-09-17 09:25:48,810] INFO [GroupMetadataManager brokerId=2] Finished 
> unloading __consumer_offsets-29. Removed 0 cached offsets and 0 cached 
> groups. (kafka.coordinator.group.GroupMetadataManager)
>  [2018-09-17 09:25:48,950] WARN [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Based on follower's leader epoch, leader replied with an unknown 
> offset in __consumer_offsets-29. The initial fetch offset 0 will be used for 
> truncation. (kafka.server.ReplicaFetcherThread)
>  [2018-09-17 09:25:48,951] INFO [Log partition=__consumer_offsets-29, 
> dir=/var/lib/kafka/data] Truncating to 0 has no effect as the largest offset 
> in the log is -1 (kafka.log.Log)
> {code}
> prod-kafka-1: leadership inauguration confirmed.
> {code}
> [2018-09-17 09:25:50,207] INFO [ReplicaFetcher replicaId=0, leaderId=2, 
> fetcherId=0] Remote broker is not the leader for partition 
> __consumer_offsets-29, which could indicate that the partition is being moved 
> (kafka.server.ReplicaFetcherThread)
> prod-kafka-2: now that it has asserted its dominance via zookeeper, 
> prod-kafka-3 added to the ISR list
> [2018-09-17 09:25:50,210] INFO [Partition __consumer_offsets-29 broker=1] 
> Expanding ISR from 1 to 1,2 (kafka.cluster.Partition)
> prod-kafka-1: still struggling to accept reality, but eventually also 
> truncates to 0.
> [2018-09-17 09:25:51,430] INFO [ReplicaFetcher replicaId=0, leaderId=2, 
> fetcherId=0] Remote broker is not the leader for partition 
> __consumer_offsets-29, which could indicate that the partition is being moved 
> (kafka.server.ReplicaFetcherThread)
>  [2018-09-17 09:25:52,615] INFO [ReplicaFetcher replicaId=0, leaderId=2, 
> fetcherId=0] Remote broker is not the leader for partition 
> __consumer_offsets-29, which could indicate that the partition is being moved 
> (kafka.server.ReplicaFetcherThread)
>  [2018-09-17 09:25:53,637] INFO [ReplicaFetcher replicaId=0, leaderId=2, 
> fetcherId=0] Remote broker is not the leader for partition 
> __consumer_offsets-29, which could indicate that the partition is being moved 
> (kafka.server.ReplicaFetcherThread)
>  [2018-09-17 09:25:54,150] INFO [ReplicaFetcherManager on broker 0] Removed 
> fetcher for partitions __consumer_offsets-29 
> (kafka.server.ReplicaFetcherManager)
>  [2018-09-17 09:25:54,151] INFO [ReplicaFetcherManager on broker 0] Added 
> fetcher for partitions List([__consumer_offsets-29, initOffset 0 to broker 
> BrokerEndPoint(1,prod-kafka-2.c.i-lastfm-prod.internal,9092)] ) 
> (kafka.server.ReplicaFetcherManager)
>  [2018-09-17 09:25:54,151] INFO [GroupMetadataManager brokerId=0] Scheduling 
> unloading of offsets and group metadata from __consumer_offsets-29 
> (kafka.coordinator.group.GroupMetadataManager)
>  [2018-09-17 09:25:54,153] INFO [GroupMetadataManager brokerId=0] Finished 
> unloading __consumer_offsets-29. Removed 0 cached offsets and 0 cached 
> groups. (kafka.coordinator.group.GroupMetadataManager)
>  [2018-09-17 09:25:54,261] WARN [ReplicaFetcher replicaId=0, leaderId=1, 
> fetcherId=0] Based on follower's leader epoch, leader replied with an unknown 
> offset in __consumer_offsets-29. The initial fetch offset 0 will be used for 
> truncation. (kafka.server.ReplicaFetcherThread)
>  [2018-09-17 09:25:54,261] INFO [Log partition=__consumer_offsets-29, 
> dir=/var/lib/kafka/data] Truncating to 0 has no effect as the largest offset 
> in the log is -1 (kafka.log.Log)
> {code}
> prod-kafka-2: completes its coup of consumer offsets, all is now 0.
> {code}
> [2018-09-17 09:25:56,244] INFO [Partition __consumer_offsets-29 broker=1] 
> Expanding ISR from 1,2 to 1,2,0 (kafka.cluster.Partition)
> {code}
>  
> Retrospectively, I realise that I have omitted any logs to do with leadership 
> rebalancing. Nevertheless, as metioned before, it's consistently 
> reproducible, and it's also easy to workaround by disabling leadership 
> rebalance entirely.
>  
> *Configuration:*
> *kafka server.properties file*
> {code}
> broker.id=1
> default.replication.factor=3
> auto.create.topics.enable=false
> min.insync.replicas=2
> num.network.threads=12
> num.io.threads=16
> num.replica.fetchers=6
> socket.send.buffer.bytes=102400
> socket.receive.buffer.bytes=102400
> socket.request.max.bytes=104857600
> log.dirs=/var/lib/kafka/data
> num.partitions=1
> num.recovery.threads.per.data.dir=4
> offsets.retention.minutes=10080
> offsets.topic.replication.factor=3
> transaction.state.log.replication.factor=3
> transaction.state.log.min.isr=2
> log.flush.interval.messages=20000
> log.flush.interval.ms=10000
> log.retention.hours=168
> log.segment.bytes=1073741824
> log.retention.check.interval.ms=60000
> zookeeper.connect=prod-kafka-1:2181,prod-kafka-2:2181,prod-kafka-3:2181
> zookeeper.connection.timeout.ms=6000
> confluent.support.metrics.enable=false
> confluent.support.customer.id=anonymous
> group.initial.rebalance.delay.ms=3000
> {code}
> *zookeeper.properties file*
> {code}
> tickTime=2000
> initLimit=10
> syncLimit=5
> dataDir=/var/lib/zookeeper
> clientPort=2181
> server.1=prod-kafka-1:2888:3888
> server.2=prod-kafka-2:2888:3888
> server.3=prod-kafka-3:2888:3888
> autopurge.purgeInterval=12
> autopurge.snapRetainCount=6
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to