[ https://issues.apache.org/jira/browse/KAFKA-1806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14254551#comment-14254551 ]
lokesh Birla commented on KAFKA-1806: ------------------------------------- Hello Neha, I did further debugging on this by turning trace on and found the following. 1. Broker 1 and broker 3 have different offset for partition 0 for topic mmetopic1. Broker 1 has higher offset than Broker 3. 2. Due to kafka leadership changed, Broker 3 became the leader which has lower offset and when Broker 1 send fetch request with higher offset, error comes from broker 3 since it does NOT have that higher offset. Here is important trace information. Broker 1 (log) [2014-09-02 06:53:55,466] DEBUG Partition [mmetopic1,0] on broker 1: Old hw for partition [mmetopic1,0] is 1330329. New hw is 1330329. All leo's are 1371212,1330329,1331850 (kafka.cluster.Partition)[2014-09-02 06:53:55,537] INFO Truncating log mmetopic1-0 to offset 1329827. (kafka.log.Log) [2014-09-02 06:53:55,477] INFO [ReplicaFetcherManager on broker 1] Added fetcher for partitions ArrayBuffer, [[mmetopic1,0], initOffset 1330329 to broker id:3,host:10.1.130.1,port:9092] ) (kafka.server.ReplicaFetcherManager [2014-09-02 06:53:55,479] TRACE [KafkaApi-1] Handling request: Name:UpdateMetadataRequest;Version:0;Controller:2;ControllerEpoch:2;CorrelationId:5;ClientId:id_2-host_null-port_9092;AliveBrokers:id:3,host:10.1.130.1,port:9092,id:2,host:10.1.129.1,port:9092,id:1,host:10.1.128.1,port:9092;PartitionState:[mmetopic1,0] -> (LeaderAndIsrInfo:(Leader:3,ISR:3,2,LeaderEpoch:2,ControllerEpoch:2),ReplicationFactor:3),AllReplicas:1,2,3) from client: /10.1.128.1:59805 (kafka.server.KafkaApis) [2014-09-02 06:53:55,490] TRACE [ReplicaFetcherThread-0-3], issuing to broker 3 of fetch request Name: FetchRequest; Version: 0; CorrelationId: 3687; ClientId: ReplicaFetcherThread-0-3; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [mmetopic1,0] -> PartitionFetchInfo(1330329,2097152) (kafka.server.ReplicaFetcherThread) [2014-09-02 06:53:55,543] WARN [ReplicaFetcherThread-0-3], Replica 1 for partition [mmetopic1,0] reset its fetch offset to current leader 3's latest offset 1329827 (kafka.server.ReplicaFetcherThread) [2014-09-02 06:53:55,543] ERROR [ReplicaFetcherThread-0-3], Current offset 1330329 for partition [mmetopic1,0] out of range; reset offset to 1329827 (kafka.server.ReplicaFetcherThread) Broker 3 (log) [2014-09-02 06:53:06,525] TRACE Setting log end offset for replica 2 for partition [mmetopic1,0] to 1330329 (kafka.cluster.Replica) [2014-09-02 06:53:06,526] DEBUG Partition [mmetopic1,0] on broker 3: Old hw for partition [mmetopic1,0] is 1329827. New hw is 1329827. All leo's are 1329827,1330329 (kafka.cluster.Partition) ========================================================================================================= [2014-09-02 06:53:06,530] ERROR [KafkaApi-3] Error when processing fetch request for partition [mmetopic1,0] offset 1330329 from follower with correlation id 3686 (kafka.server.KafkaApis) kafka.common.OffsetOutOfRangeException: Request for offset 1330329 but we only have log segments in the range 0 to 1329827. at kafka.log.Log.read(Log.scala:380) at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:530) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:476) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:471) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233) at scala.collection.immutable.Map$Map3.foreach(Map.scala:164) at scala.collection.TraversableLike$class.map(TraversableLike.scala:233) at scala.collection.immutable.Map$Map3.map(Map.scala:144) at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:471) at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:437) at kafka.server.KafkaApis.handle(KafkaApis.scala:186) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) at java.lang.Thread.run(Thread.java:745) ========================================================================================== > broker can still expose uncommitted data to a consumer > ------------------------------------------------------ > > Key: KAFKA-1806 > URL: https://issues.apache.org/jira/browse/KAFKA-1806 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 0.8.1.1 > Reporter: lokesh Birla > Assignee: Neha Narkhede > > Although following issue: https://issues.apache.org/jira/browse/KAFKA-727 > is marked fixed but I still see this issue in 0.8.1.1. I am able to > reproducer the issue consistently. > [2014-08-18 06:43:58,356] ERROR [KafkaApi-1] Error when processing fetch > request for partition [mmetopic4,2] offset 1940029 from consumer with > correlation id 21 (kafka.server.Kaf > kaApis) > java.lang.IllegalArgumentException: Attempt to read with a maximum offset > (1818353) less than the start offset (1940029). > at kafka.log.LogSegment.read(LogSegment.scala:136) > at kafka.log.Log.read(Log.scala:386) > at > kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:530) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:476) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:471) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:119) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:233) > at scala.collection.immutable.Map$Map1.map(Map.scala:107) > at > kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:471) > at > kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:783) > at > kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:765) > at > kafka.server.RequestPurgatory$ExpiredRequestReaper.run(RequestPurgatory.scala:216) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.4#6332)