[ 
https://issues.apache.org/jira/browse/KAFKA-1806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14254551#comment-14254551
 ] 

lokesh Birla commented on KAFKA-1806:
-------------------------------------

Hello Neha,

I did further debugging on this by turning trace on and found the following. 

1. Broker 1 and broker 3 have different offset for partition 0 for topic 
mmetopic1.  Broker 1 has higher offset than Broker 3.  
2. Due to kafka leadership changed, Broker 3 became the leader which has lower 
offset and when Broker 1 send fetch request with higher offset, error comes 
from broker 3 since it does NOT have that higher offset. 

Here is important trace information. 

Broker 1 (log)

[2014-09-02 06:53:55,466] DEBUG Partition [mmetopic1,0] on broker 1: Old hw for 
partition [mmetopic1,0] is 1330329. New hw is 1330329. All leo's are 
1371212,1330329,1331850 (kafka.cluster.Partition)[2014-09-02 06:53:55,537] INFO 
Truncating log mmetopic1-0 to offset 1329827. (kafka.log.Log)
[2014-09-02 06:53:55,477] INFO [ReplicaFetcherManager on broker 1] Added 
fetcher for partitions ArrayBuffer, [[mmetopic1,0], initOffset 1330329 to 
broker id:3,host:10.1.130.1,port:9092] ) (kafka.server.ReplicaFetcherManager
[2014-09-02 06:53:55,479] TRACE [KafkaApi-1] Handling request: 
Name:UpdateMetadataRequest;Version:0;Controller:2;ControllerEpoch:2;CorrelationId:5;ClientId:id_2-host_null-port_9092;AliveBrokers:id:3,host:10.1.130.1,port:9092,id:2,host:10.1.129.1,port:9092,id:1,host:10.1.128.1,port:9092;PartitionState:[mmetopic1,0]
 -> 
(LeaderAndIsrInfo:(Leader:3,ISR:3,2,LeaderEpoch:2,ControllerEpoch:2),ReplicationFactor:3),AllReplicas:1,2,3)
 from client: /10.1.128.1:59805 (kafka.server.KafkaApis)
[2014-09-02 06:53:55,490] TRACE [ReplicaFetcherThread-0-3], issuing to broker 3 
of fetch request Name: FetchRequest; Version: 0; CorrelationId: 3687; ClientId: 
ReplicaFetcherThread-0-3; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; 
RequestInfo: [mmetopic1,0] -> PartitionFetchInfo(1330329,2097152) 
(kafka.server.ReplicaFetcherThread)
[2014-09-02 06:53:55,543] WARN [ReplicaFetcherThread-0-3], Replica 1 for 
partition [mmetopic1,0] reset its fetch offset to current leader 3's latest 
offset 1329827 (kafka.server.ReplicaFetcherThread)
[2014-09-02 06:53:55,543] ERROR [ReplicaFetcherThread-0-3], Current offset 
1330329 for partition [mmetopic1,0] out of range; reset offset to 1329827 
(kafka.server.ReplicaFetcherThread)

Broker 3 (log)
[2014-09-02 06:53:06,525] TRACE Setting log end offset for replica 2 for 
partition [mmetopic1,0] to 1330329 (kafka.cluster.Replica)
[2014-09-02 06:53:06,526] DEBUG Partition [mmetopic1,0] on broker 3: Old hw for 
partition [mmetopic1,0] is 1329827. New hw is 1329827. All leo's are 
1329827,1330329 (kafka.cluster.Partition)




=========================================================================================================

[2014-09-02 06:53:06,530] ERROR [KafkaApi-3] Error when processing fetch 
request for partition [mmetopic1,0] offset 1330329 from follower with 
correlation id 3686 (kafka.server.KafkaApis)
kafka.common.OffsetOutOfRangeException: Request for offset 1330329 but we only 
have log segments in the range 0 to 1329827.
        at kafka.log.Log.read(Log.scala:380)
        at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:530)
        at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:476)
        at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:471)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
        at scala.collection.immutable.Map$Map3.foreach(Map.scala:164)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
        at scala.collection.immutable.Map$Map3.map(Map.scala:144)
        at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:471)
        at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:437)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:186)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
        at java.lang.Thread.run(Thread.java:745)

==========================================================================================



> broker can still expose uncommitted data to a consumer
> ------------------------------------------------------
>
>                 Key: KAFKA-1806
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1806
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.8.1.1
>            Reporter: lokesh Birla
>            Assignee: Neha Narkhede
>
> Although following issue: https://issues.apache.org/jira/browse/KAFKA-727
> is marked fixed but I still see this issue in 0.8.1.1. I am able to 
> reproducer the issue consistently. 
> [2014-08-18 06:43:58,356] ERROR [KafkaApi-1] Error when processing fetch 
> request for partition [mmetopic4,2] offset 1940029 from consumer with 
> correlation id 21 (kafka.server.Kaf
> kaApis)
> java.lang.IllegalArgumentException: Attempt to read with a maximum offset 
> (1818353) less than the start offset (1940029).
>         at kafka.log.LogSegment.read(LogSegment.scala:136)
>         at kafka.log.Log.read(Log.scala:386)
>         at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:530)
>         at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:476)
>         at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:471)
>         at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
>         at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
>         at scala.collection.immutable.Map$Map1.foreach(Map.scala:119)
>         at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
>         at scala.collection.immutable.Map$Map1.map(Map.scala:107)
>         at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:471)
>         at 
> kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:783)
>         at 
> kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:765)
>         at 
> kafka.server.RequestPurgatory$ExpiredRequestReaper.run(RequestPurgatory.scala:216)
>         at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to