[ https://issues.apache.org/jira/browse/KAFKA-2575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Guozhang Wang updated KAFKA-2575: --------------------------------- Component/s: (was: kafka streams) > inconsistant offset count in replication-offset-checkpoint during lead > election leads to huge exceptions > -------------------------------------------------------------------------------------------------------- > > Key: KAFKA-2575 > URL: https://issues.apache.org/jira/browse/KAFKA-2575 > Project: Kafka > Issue Type: Bug > Affects Versions: 0.8.2.1 > Reporter: Warren Jin > > We have 3 brokers, more than 100 topics in production, the default partition > number is 24 for each topic, the replication factor is 3. > We noticed the following errors in recent days. > 2015-09-22 22:25:12,529 ERROR Error on broker 1 while processing LeaderAndIsr > request correlationId 438501 received from controller 2 epoch 12 for > partition [LOGIST.DELIVERY.SUBSCRIBE,7] (state.change.logger) > java.io.IOException: Expected 3918 entries but found only 3904 > at kafka.server.OffsetCheckpoint.read(OffsetCheckpoint.scala:99) > at kafka.cluster.Partition.getOrCreateReplica(Partition.scala:91) > at > kafka.cluster.Partition$$anonfun$makeLeader$1$$anonfun$apply$mcZ$sp$4.apply(Partition.scala:171) > at > kafka.cluster.Partition$$anonfun$makeLeader$1$$anonfun$apply$mcZ$sp$4.apply(Partition.scala:171) > at scala.collection.immutable.Set$Set3.foreach(Set.scala:115) > at > kafka.cluster.Partition$$anonfun$makeLeader$1.apply$mcZ$sp(Partition.scala:171) > at > kafka.cluster.Partition$$anonfun$makeLeader$1.apply(Partition.scala:163) > at > kafka.cluster.Partition$$anonfun$makeLeader$1.apply(Partition.scala:163) > at kafka.utils.Utils$.inLock(Utils.scala:535) > at kafka.utils.Utils$.inWriteLock(Utils.scala:543) > at kafka.cluster.Partition.makeLeader(Partition.scala:163) > at > kafka.server.ReplicaManager$$anonfun$makeLeaders$5.apply(ReplicaManager.scala:427) > at > kafka.server.ReplicaManager$$anonfun$makeLeaders$5.apply(ReplicaManager.scala:426) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) > at kafka.server.ReplicaManager.makeLeaders(ReplicaManager.scala:426) > at > kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:378) > at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:120) > at kafka.server.KafkaApis.handle(KafkaApis.scala:63) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59) > at java.lang.Thread.run(Thread.java:745) > It occurs in LOGIST.DELIVERY.SUBSCRIBE partition election, > then it repeatly pring out the error message: > 2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request > with correlation id 14943530 from client ReplicaFetcherThread-2-1 on > partition [LOGIST.DELIVERY.SUBSCRIBE,22] failed due to Leader not local for > partition [LOGIST.DELIVERY.SUBSCRIBE,22] on broker 1 > (kafka.server.ReplicaManager) > 2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request > with correlation id 15022337 from client ReplicaFetcherThread-1-1 on > partition [LOGIST.DELIVERY.SUBSCRIBE,1] failed due to Leader not local for > partition [LOGIST.DELIVERY.SUBSCRIBE,1] on broker 1 > (kafka.server.ReplicaManager) > 2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request > with correlation id 15078431 from client ReplicaFetcherThread-0-1 on > partition [LOGIST.DELIVERY.SUBSCRIBE,4] failed due to Leader not local for > partition [LOGIST.DELIVERY.SUBSCRIBE,4] on broker 1 > (kafka.server.ReplicaManager) > 2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request > with correlation id 13477660 from client ReplicaFetcherThread-2-1 on > partition [LOGIST.DELIVERY.SUBSCRIBE,10] failed due to Leader not local for > partition [LOGIST.DELIVERY.SUBSCRIBE,10] on broker 1 > (kafka.server.ReplicaManager) > 2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request > with correlation id 15022337 from client ReplicaFetcherThread-1-1 on > partition [LOGIST.DELIVERY.SUBSCRIBE,13] failed due to Leader not local for > partition [LOGIST.DELIVERY.SUBSCRIBE,13] on broker 1 > (kafka.server.ReplicaManager) > 2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request > with correlation id 15078431 from client ReplicaFetcherThread-0-1 on > partition [LOGIST.DELIVERY.SUBSCRIBE,16] failed due to Leader not local for > partition [LOGIST.DELIVERY.SUBSCRIBE,16] on broker 1 > (kafka.server.ReplicaManager) > 2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request > with correlation id 14988525 from client ReplicaFetcherThread-3-1 on > partition [LOGIST.DELIVERY.SUBSCRIBE,19] failed due to Leader not local for > partition [LOGIST.DELIVERY.SUBSCRIBE,19] on broker 1 > (kafka.server.ReplicaManager) > 2015-09-23 10:20:03 525 ERROR [KafkaApi-1] error when handling request Name: > FetchRequest; Version: 0; CorrelationId: 15022337; ClientId: > ReplicaFetcherThread-1-1; ReplicaId: 0; MaxWait: 500 ms; MinBytes: 1 bytes; > RequestInfo: [LOGIST.DELIVERY.SUBSCRIBE,1] -> > PartitionFetchInfo(0,1048576),[LOGIST.DELIVERY.SUBSCRIBE,13] -> > PartitionFetchInfo(0,1048576) (kafka.server.KafkaApis) > kafka.common.NotAssignedReplicaException: Leader 1 failed to record follower > 0's position -1 since the replica is not recognized to be one of the assigned > replicas for partition [LOGIST.DELIVERY.SUBSCRIBE,1] > at > kafka.server.ReplicaManager.updateReplicaLEOAndPartitionHW(ReplicaManager.scala:574) > at > kafka.server.KafkaApis$$anonfun$recordFollowerLogEndOffsets$2.apply(KafkaApis.scala:388) > at > kafka.server.KafkaApis$$anonfun$recordFollowerLogEndOffsets$2.apply(KafkaApis.scala:386) > at > scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245) > at > scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) > at scala.collection.immutable.Map$Map2.foreach(Map.scala:130) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) > at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245) > at > kafka.server.KafkaApis.recordFollowerLogEndOffsets(KafkaApis.scala:386) > at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:351) > at kafka.server.KafkaApis.handle(KafkaApis.scala:60) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59) > at java.lang.Thread.run(Thread.java:745) > 2015-09-23 10:20:03 525 ERROR [KafkaApi-1] error when handling request Name: > FetchRequest; Version: 0; CorrelationId: 15078431; ClientId: > ReplicaFetcherThread-0-1; ReplicaId: 0; MaxWait: 500 ms; MinBytes: 1 bytes; > RequestInfo: [LOGIST.DELIVERY.SUBSCRIBE,4] -> > PartitionFetchInfo(0,1048576),[LOGIST.DELIVERY.SUBSCRIBE,16] -> > PartitionFetchInfo(0,1048576) (kafka.server.KafkaApis) > kafka.common.NotAssignedReplicaException: Leader 1 failed to record follower > 0's position -1 since the replica is not recognized to be one of the assigned > replicas for partition [LOGIST.DELIVERY.SUBSCRIBE,4] > at > kafka.server.ReplicaManager.updateReplicaLEOAndPartitionHW(ReplicaManager.scala:574) > at > kafka.server.KafkaApis$$anonfun$recordFollowerLogEndOffsets$2.apply(KafkaApis.scala:388) > at > kafka.server.KafkaApis$$anonfun$recordFollowerLogEndOffsets$2.apply(KafkaApis.scala:386) > at > scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245) > at > scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) > at scala.collection.immutable.Map$Map2.foreach(Map.scala:130) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) > at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245) > at > kafka.server.KafkaApis.recordFollowerLogEndOffsets(KafkaApis.scala:386) > at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:351) > at kafka.server.KafkaApis.handle(KafkaApis.scala:60) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59) > at java.lang.Thread.run(Thread.java:745) > I checked the kafka source code:OffsetCheckpoint.scala, ReplicaManager.scala, > LogManager.Scala, the replication manager write the offsets of all the > partitions to replication-offset-checkpoint every 5 seconds, and it has the > internel lock for this file for every OffsetCheckpoint, it shoud be > impossible that the offset count is 3918, but the actual count of offset > entries is 3904? Is it the multihread issue that some other thread flush the > content to the same file due to the internal lock in OffsetCheckPoint? I'm > not familiar with the Scala, but the > class OffsetCheckpoint(val file: File) extends Logging { > private val lock = new Object() > it looks like the instance lock not the static class lock. > If it's the issue in Kafka, is there any quick work around for this problem? > We restart this broker, the error was disappearred, but the replicas for this > topic is not that correct although it could produce and consume the message. > Please let me know if you need more information. > Thanks and best regards, > Warren -- This message was sent by Atlassian JIRA (v6.3.4#6332)