Hi Jim,

When messages gets cleaned based on data retention policy (by time or by
size), the brokers will not inform ZK for the deletion event. The
underlying assumption is that when consumers are fetching data at around
the tail of the log (i.e. they are not much lagging, which is normal cases)
they should be continuously update the consumed offsets in ZK and hence
that offsets will be valid most of the time. When consumers are lagging
behind and the old messages are cleaned they will get this exception, and
consumers need to handle it by resetting their offset to, e.g. the head of
the log.

How frequent do your clients read / write the offsets in ZK?

Guozhang

On Thu, Nov 6, 2014 at 6:23 PM, Jimmy John <jj...@livefyre.com> wrote:

> Hello,
>
>   I understand what this error means, just not sure why I keep running into
> it after 24-48 hrs of running fine consuming > 300 messages / second.
>
>   What happens when a kafka log rolls over and some old records are aged
> out? I mean what happens to the offsets? We are using a python client which
> stores the offsets in ZK. But in the middle of the run, say after 2 days or
> so, suddenly it gets this error.
>
> The only possibility is that the older records have aged off and ZK still
> has the offset which is no longer applicable...How does the java client
> deal with this? Does kafka inform ZK that records have been aged off and
> update the offset or something?
>
> Here is the error i see in the broker logs
>
> [2014-11-07 01:40:32,478] ERROR [KafkaApi-11] Error when processing fetch
> request for partition [activity.stream,3] offset 8013827 from consumer
> with
> correlation id 73 (kafka.server.KafkaApis)
>
>  kafka.common.OffsetOutOfRangeException: Request for offset 8013827 but we
> only have log segments in the range 8603331 to 11279773.
>
>      at kafka.log.Log.read(Log.scala:380)
>
>      at
>
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:530)
>
>      at
>
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:476)
>
>      at
>
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:471)
>
>      at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
>
>      at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
>
>      at scala.collection.immutable.Map$Map3.foreach(Map.scala:164)
>
>     at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
>
>      at scala.collection.immutable.Map$Map3.map(Map.scala:144)
>
>      at
>
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:471)
>
>      at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:437)
>
>      at kafka.server.KafkaApis.handle(KafkaApis.scala:186)
>
>      at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
>
>      at java.lang.Thread.run(Thread.java:745)
>
>
> thx
>
> Jim
>



-- 
-- Guozhang

Reply via email to