[ https://issues.apache.org/jira/browse/KAFKA-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16457169#comment-16457169 ]
Anna Povzner commented on KAFKA-6832: ------------------------------------- Regarding LogSegment.read method, you are correct about translateOffset() calling lookup() method that returns the the largest offset less than or equal to the given targetOffset. However, notice that the offset returned from lookup() is used as a *starting offset to search from* by log.searchForOffsetWithSize which is called next and actually does the search for the offset that is greater than or equal to the target offset. The error that you are seeing could be an edge case causing log divergence described in KAFKA-6361(see KIP-279) which is currently in progress. That is a pretty rare case, so maybe it could be worthwhile checking the message format version you are using. If you upgraded to Kafka 1.1 but did not upgrade message format (message format of pre- Kafka 0.11), log divergence could happen more often. See KIP-101 ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+use+Leader+Epoch+rather+than+High+Watermark+for+Truncation).] > Wrong start position in the log file on the leader, on fetch request. > --------------------------------------------------------------------- > > Key: KAFKA-6832 > URL: https://issues.apache.org/jira/browse/KAFKA-6832 > Project: Kafka > Issue Type: Bug > Components: core > Affects Versions: 1.1.0 > Reporter: Ciprian Pascu > Priority: Major > > Hi, > We have an environment with 3 Kafka brokers; after hard reboot all brokers > (by hard rebooting the VMs on which they are located), we experience drop in > the ISR, for the topics that have replication factor greater than 1; it is > caused by the death of some of the replica threads with the following > exception: > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: > *kafka.common.KafkaException: Error processing data for partition > __consumer_offsets-39 offset 308060* > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$ > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$ > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > scala.Option.foreach(Option.scala:257) > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(Abs > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(Abs > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThrea > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217) > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:167) > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64) > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: *Caused by: > java.lang.IllegalArgumentException: Out of order offsets found in > List(308059, 308060)* > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > kafka.log.Log$$anonfun$append$2.apply(Log.scala:683) > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > kafka.log.Log$$anonfun$append$2.apply(Log.scala:624) > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > kafka.log.Log.maybeHandleIOException(Log.scala:1679) > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > kafka.log.Log.append(Log.scala:624) > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > kafka.log.Log.appendAsFollower(Log.scala:607) > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:102) > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:41) > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$ > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: ... 13 more > > The replica requests for offset *308060, but it gets a message set containing > (**308059, 308060), which makes the replica thread crash, due to the above > exception. The reason why the leader sends a message set with a smaller > offset than requested seems to be in the implementation of 'read' method from > 'LogSegment'; according to the comment, this method should '*Read a message > set from this segment beginning with the first offset >= startOffset', but > actually it is using 'translateOffset' method, which uses 'lookup' method > which, according to comment, 'Find the largest offset less than or equal to > the given targetOffset'; the code confirms this; so, it seems we have a > contradiction here. > > Ciprian. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)