[ 
https://issues.apache.org/jira/browse/KAFKA-2575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ray Chiang updated KAFKA-2575:
------------------------------
    Component/s: offset manager

> inconsistant offset count in replication-offset-checkpoint during lead 
> election leads to huge exceptions
> --------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-2575
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2575
>             Project: Kafka
>          Issue Type: Bug
>          Components: offset manager
>    Affects Versions: 0.8.2.1
>            Reporter: Warren Jin
>            Priority: Major
>
> We have 3 brokers, more than 100 topics in production, the default partition 
> number is 24 for each topic, the replication factor is 3.
> We noticed the following errors in recent days.
> 2015-09-22 22:25:12,529 ERROR Error on broker 1 while processing LeaderAndIsr 
> request correlationId 438501 received from controller 2 epoch 12 for 
> partition [LOGIST.DELIVERY.SUBSCRIBE,7] (state.change.logger)
> java.io.IOException: Expected 3918 entries but found only 3904
>       at kafka.server.OffsetCheckpoint.read(OffsetCheckpoint.scala:99)
>       at kafka.cluster.Partition.getOrCreateReplica(Partition.scala:91)
>       at 
> kafka.cluster.Partition$$anonfun$makeLeader$1$$anonfun$apply$mcZ$sp$4.apply(Partition.scala:171)
>       at 
> kafka.cluster.Partition$$anonfun$makeLeader$1$$anonfun$apply$mcZ$sp$4.apply(Partition.scala:171)
>       at scala.collection.immutable.Set$Set3.foreach(Set.scala:115)
>       at 
> kafka.cluster.Partition$$anonfun$makeLeader$1.apply$mcZ$sp(Partition.scala:171)
>       at 
> kafka.cluster.Partition$$anonfun$makeLeader$1.apply(Partition.scala:163)
>       at 
> kafka.cluster.Partition$$anonfun$makeLeader$1.apply(Partition.scala:163)
>       at kafka.utils.Utils$.inLock(Utils.scala:535)
>       at kafka.utils.Utils$.inWriteLock(Utils.scala:543)
>       at kafka.cluster.Partition.makeLeader(Partition.scala:163)
>       at 
> kafka.server.ReplicaManager$$anonfun$makeLeaders$5.apply(ReplicaManager.scala:427)
>       at 
> kafka.server.ReplicaManager$$anonfun$makeLeaders$5.apply(ReplicaManager.scala:426)
>       at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>       at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>       at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>       at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>       at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>       at kafka.server.ReplicaManager.makeLeaders(ReplicaManager.scala:426)
>       at 
> kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:378)
>       at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:120)
>       at kafka.server.KafkaApis.handle(KafkaApis.scala:63)
>       at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
>       at java.lang.Thread.run(Thread.java:745)
> It occurs in LOGIST.DELIVERY.SUBSCRIBE partition election, 
> then it repeatly pring out the error message:
> 2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request 
> with correlation id 14943530 from client ReplicaFetcherThread-2-1 on 
> partition [LOGIST.DELIVERY.SUBSCRIBE,22] failed due to Leader not local for 
> partition [LOGIST.DELIVERY.SUBSCRIBE,22] on broker 1 
> (kafka.server.ReplicaManager)
> 2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request 
> with correlation id 15022337 from client ReplicaFetcherThread-1-1 on 
> partition [LOGIST.DELIVERY.SUBSCRIBE,1] failed due to Leader not local for 
> partition [LOGIST.DELIVERY.SUBSCRIBE,1] on broker 1 
> (kafka.server.ReplicaManager)
> 2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request 
> with correlation id 15078431 from client ReplicaFetcherThread-0-1 on 
> partition [LOGIST.DELIVERY.SUBSCRIBE,4] failed due to Leader not local for 
> partition [LOGIST.DELIVERY.SUBSCRIBE,4] on broker 1 
> (kafka.server.ReplicaManager)
> 2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request 
> with correlation id 13477660 from client ReplicaFetcherThread-2-1 on 
> partition [LOGIST.DELIVERY.SUBSCRIBE,10] failed due to Leader not local for 
> partition [LOGIST.DELIVERY.SUBSCRIBE,10] on broker 1 
> (kafka.server.ReplicaManager)
> 2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request 
> with correlation id 15022337 from client ReplicaFetcherThread-1-1 on 
> partition [LOGIST.DELIVERY.SUBSCRIBE,13] failed due to Leader not local for 
> partition [LOGIST.DELIVERY.SUBSCRIBE,13] on broker 1 
> (kafka.server.ReplicaManager)
> 2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request 
> with correlation id 15078431 from client ReplicaFetcherThread-0-1 on 
> partition [LOGIST.DELIVERY.SUBSCRIBE,16] failed due to Leader not local for 
> partition [LOGIST.DELIVERY.SUBSCRIBE,16] on broker 1 
> (kafka.server.ReplicaManager)
> 2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request 
> with correlation id 14988525 from client ReplicaFetcherThread-3-1 on 
> partition [LOGIST.DELIVERY.SUBSCRIBE,19] failed due to Leader not local for 
> partition [LOGIST.DELIVERY.SUBSCRIBE,19] on broker 1 
> (kafka.server.ReplicaManager)
> 2015-09-23 10:20:03 525 ERROR [KafkaApi-1] error when handling request Name: 
> FetchRequest; Version: 0; CorrelationId: 15022337; ClientId: 
> ReplicaFetcherThread-1-1; ReplicaId: 0; MaxWait: 500 ms; MinBytes: 1 bytes; 
> RequestInfo: [LOGIST.DELIVERY.SUBSCRIBE,1] -> 
> PartitionFetchInfo(0,1048576),[LOGIST.DELIVERY.SUBSCRIBE,13] -> 
> PartitionFetchInfo(0,1048576) (kafka.server.KafkaApis)
> kafka.common.NotAssignedReplicaException: Leader 1 failed to record follower 
> 0's position -1 since the replica is not recognized to be one of the assigned 
> replicas  for partition [LOGIST.DELIVERY.SUBSCRIBE,1]
>       at 
> kafka.server.ReplicaManager.updateReplicaLEOAndPartitionHW(ReplicaManager.scala:574)
>       at 
> kafka.server.KafkaApis$$anonfun$recordFollowerLogEndOffsets$2.apply(KafkaApis.scala:388)
>       at 
> kafka.server.KafkaApis$$anonfun$recordFollowerLogEndOffsets$2.apply(KafkaApis.scala:386)
>       at 
> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
>       at 
> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
>       at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>       at scala.collection.immutable.Map$Map2.foreach(Map.scala:130)
>       at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>       at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
>       at 
> kafka.server.KafkaApis.recordFollowerLogEndOffsets(KafkaApis.scala:386)
>       at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:351)
>       at kafka.server.KafkaApis.handle(KafkaApis.scala:60)
>       at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
>       at java.lang.Thread.run(Thread.java:745)
> 2015-09-23 10:20:03 525 ERROR [KafkaApi-1] error when handling request Name: 
> FetchRequest; Version: 0; CorrelationId: 15078431; ClientId: 
> ReplicaFetcherThread-0-1; ReplicaId: 0; MaxWait: 500 ms; MinBytes: 1 bytes; 
> RequestInfo: [LOGIST.DELIVERY.SUBSCRIBE,4] -> 
> PartitionFetchInfo(0,1048576),[LOGIST.DELIVERY.SUBSCRIBE,16] -> 
> PartitionFetchInfo(0,1048576) (kafka.server.KafkaApis)
> kafka.common.NotAssignedReplicaException: Leader 1 failed to record follower 
> 0's position -1 since the replica is not recognized to be one of the assigned 
> replicas  for partition [LOGIST.DELIVERY.SUBSCRIBE,4]
>       at 
> kafka.server.ReplicaManager.updateReplicaLEOAndPartitionHW(ReplicaManager.scala:574)
>       at 
> kafka.server.KafkaApis$$anonfun$recordFollowerLogEndOffsets$2.apply(KafkaApis.scala:388)
>       at 
> kafka.server.KafkaApis$$anonfun$recordFollowerLogEndOffsets$2.apply(KafkaApis.scala:386)
>       at 
> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
>       at 
> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
>       at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>       at scala.collection.immutable.Map$Map2.foreach(Map.scala:130)
>       at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>       at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
>       at 
> kafka.server.KafkaApis.recordFollowerLogEndOffsets(KafkaApis.scala:386)
>       at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:351)
>       at kafka.server.KafkaApis.handle(KafkaApis.scala:60)
>       at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
>       at java.lang.Thread.run(Thread.java:745)
> I checked the kafka source code:OffsetCheckpoint.scala, ReplicaManager.scala, 
> LogManager.Scala, the replication manager write the offsets of all the 
> partitions to replication-offset-checkpoint every 5 seconds, and it has the 
> internel lock for this file for every OffsetCheckpoint, it shoud be 
> impossible that the offset count is 3918, but the actual count of offset 
> entries is 3904? Is it the multihread issue that some other thread flush the 
> content to the same file due to the internal lock in OffsetCheckPoint? I'm 
> not familiar with the Scala, but the 
> class OffsetCheckpoint(val file: File) extends Logging {
>   private val lock = new Object()
> it looks like the instance lock not the static class lock.
> If it's the issue in Kafka, is there any quick work around for this problem?
> We restart this broker, the error was disappearred, but the replicas for this 
> topic is not that correct although it could produce and consume the message.
> Please let me know if you need more information.
> Thanks and best regards,
>  Warren



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to