[
https://issues.apache.org/jira/browse/KAFKA-1806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14254551#comment-14254551
]
lokesh Birla commented on KAFKA-1806:
-------------------------------------
Hello Neha,
I did further debugging on this by turning trace on and found the following.
1. Broker 1 and broker 3 have different offset for partition 0 for topic
mmetopic1. Broker 1 has higher offset than Broker 3.
2. Due to kafka leadership changed, Broker 3 became the leader which has lower
offset and when Broker 1 send fetch request with higher offset, error comes
from broker 3 since it does NOT have that higher offset.
Here is important trace information.
Broker 1 (log)
[2014-09-02 06:53:55,466] DEBUG Partition [mmetopic1,0] on broker 1: Old hw for
partition [mmetopic1,0] is 1330329. New hw is 1330329. All leo's are
1371212,1330329,1331850 (kafka.cluster.Partition)[2014-09-02 06:53:55,537] INFO
Truncating log mmetopic1-0 to offset 1329827. (kafka.log.Log)
[2014-09-02 06:53:55,477] INFO [ReplicaFetcherManager on broker 1] Added
fetcher for partitions ArrayBuffer, [[mmetopic1,0], initOffset 1330329 to
broker id:3,host:10.1.130.1,port:9092] ) (kafka.server.ReplicaFetcherManager
[2014-09-02 06:53:55,479] TRACE [KafkaApi-1] Handling request:
Name:UpdateMetadataRequest;Version:0;Controller:2;ControllerEpoch:2;CorrelationId:5;ClientId:id_2-host_null-port_9092;AliveBrokers:id:3,host:10.1.130.1,port:9092,id:2,host:10.1.129.1,port:9092,id:1,host:10.1.128.1,port:9092;PartitionState:[mmetopic1,0]
->
(LeaderAndIsrInfo:(Leader:3,ISR:3,2,LeaderEpoch:2,ControllerEpoch:2),ReplicationFactor:3),AllReplicas:1,2,3)
from client: /10.1.128.1:59805 (kafka.server.KafkaApis)
[2014-09-02 06:53:55,490] TRACE [ReplicaFetcherThread-0-3], issuing to broker 3
of fetch request Name: FetchRequest; Version: 0; CorrelationId: 3687; ClientId:
ReplicaFetcherThread-0-3; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes;
RequestInfo: [mmetopic1,0] -> PartitionFetchInfo(1330329,2097152)
(kafka.server.ReplicaFetcherThread)
[2014-09-02 06:53:55,543] WARN [ReplicaFetcherThread-0-3], Replica 1 for
partition [mmetopic1,0] reset its fetch offset to current leader 3's latest
offset 1329827 (kafka.server.ReplicaFetcherThread)
[2014-09-02 06:53:55,543] ERROR [ReplicaFetcherThread-0-3], Current offset
1330329 for partition [mmetopic1,0] out of range; reset offset to 1329827
(kafka.server.ReplicaFetcherThread)
Broker 3 (log)
[2014-09-02 06:53:06,525] TRACE Setting log end offset for replica 2 for
partition [mmetopic1,0] to 1330329 (kafka.cluster.Replica)
[2014-09-02 06:53:06,526] DEBUG Partition [mmetopic1,0] on broker 3: Old hw for
partition [mmetopic1,0] is 1329827. New hw is 1329827. All leo's are
1329827,1330329 (kafka.cluster.Partition)
=========================================================================================================
[2014-09-02 06:53:06,530] ERROR [KafkaApi-3] Error when processing fetch
request for partition [mmetopic1,0] offset 1330329 from follower with
correlation id 3686 (kafka.server.KafkaApis)
kafka.common.OffsetOutOfRangeException: Request for offset 1330329 but we only
have log segments in the range 0 to 1329827.
at kafka.log.Log.read(Log.scala:380)
at
kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:530)
at
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:476)
at
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:471)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
at scala.collection.immutable.Map$Map3.foreach(Map.scala:164)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
at scala.collection.immutable.Map$Map3.map(Map.scala:144)
at
kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:471)
at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:437)
at kafka.server.KafkaApis.handle(KafkaApis.scala:186)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
at java.lang.Thread.run(Thread.java:745)
==========================================================================================
> broker can still expose uncommitted data to a consumer
> ------------------------------------------------------
>
> Key: KAFKA-1806
> URL: https://issues.apache.org/jira/browse/KAFKA-1806
> Project: Kafka
> Issue Type: Bug
> Components: consumer
> Affects Versions: 0.8.1.1
> Reporter: lokesh Birla
> Assignee: Neha Narkhede
>
> Although following issue: https://issues.apache.org/jira/browse/KAFKA-727
> is marked fixed but I still see this issue in 0.8.1.1. I am able to
> reproducer the issue consistently.
> [2014-08-18 06:43:58,356] ERROR [KafkaApi-1] Error when processing fetch
> request for partition [mmetopic4,2] offset 1940029 from consumer with
> correlation id 21 (kafka.server.Kaf
> kaApis)
> java.lang.IllegalArgumentException: Attempt to read with a maximum offset
> (1818353) less than the start offset (1940029).
> at kafka.log.LogSegment.read(LogSegment.scala:136)
> at kafka.log.Log.read(Log.scala:386)
> at
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:530)
> at
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:476)
> at
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:471)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:119)
> at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
> at scala.collection.immutable.Map$Map1.map(Map.scala:107)
> at
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:471)
> at
> kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:783)
> at
> kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:765)
> at
> kafka.server.RequestPurgatory$ExpiredRequestReaper.run(RequestPurgatory.scala:216)
> at java.lang.Thread.run(Thread.java:745)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)