The bottom line, is you are likely not consuming messages fast enough, so
you are falling behind.  So, you are steadily consuming older and older
messages, and eventually you are consuming messages older than the
retention time window set for your kafka broker.  That's the typical
scenario for this error.

Also, if you are consuming messages 300/sec, but you are committing once
every 100 messages, that means you are committing 3 times a second, which
could be a bottleneck. The default auto-offset commit is to do it every 60
seconds, etc.....

Jason

On Fri, Nov 7, 2014 at 1:30 PM, Jimmy John <jj...@livefyre.com> wrote:

> The current setting is to commit to ZK every 100 messages read.
>
> The read buffer size is 262144 bytes. So we will read in a bunch of
> messages in a batch. And while iterating through those messages, we commit
> the offset to ZK every 100.
>
> jim
>
> On Fri, Nov 7, 2014 at 10:13 AM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hi Jim,
> >
> > When messages gets cleaned based on data retention policy (by time or by
> > size), the brokers will not inform ZK for the deletion event. The
> > underlying assumption is that when consumers are fetching data at around
> > the tail of the log (i.e. they are not much lagging, which is normal
> cases)
> > they should be continuously update the consumed offsets in ZK and hence
> > that offsets will be valid most of the time. When consumers are lagging
> > behind and the old messages are cleaned they will get this exception, and
> > consumers need to handle it by resetting their offset to, e.g. the head
> of
> > the log.
> >
> > How frequent do your clients read / write the offsets in ZK?
> >
> > Guozhang
> >
> > On Thu, Nov 6, 2014 at 6:23 PM, Jimmy John <jj...@livefyre.com> wrote:
> >
> > > Hello,
> > >
> > >   I understand what this error means, just not sure why I keep running
> > into
> > > it after 24-48 hrs of running fine consuming > 300 messages / second.
> > >
> > >   What happens when a kafka log rolls over and some old records are
> aged
> > > out? I mean what happens to the offsets? We are using a python client
> > which
> > > stores the offsets in ZK. But in the middle of the run, say after 2
> days
> > or
> > > so, suddenly it gets this error.
> > >
> > > The only possibility is that the older records have aged off and ZK
> still
> > > has the offset which is no longer applicable...How does the java client
> > > deal with this? Does kafka inform ZK that records have been aged off
> and
> > > update the offset or something?
> > >
> > > Here is the error i see in the broker logs
> > >
> > > [2014-11-07 01:40:32,478] ERROR [KafkaApi-11] Error when processing
> fetch
> > > request for partition [activity.stream,3] offset 8013827 from consumer
> > > with
> > > correlation id 73 (kafka.server.KafkaApis)
> > >
> > >  kafka.common.OffsetOutOfRangeException: Request for offset 8013827 but
> > we
> > > only have log segments in the range 8603331 to 11279773.
> > >
> > >      at kafka.log.Log.read(Log.scala:380)
> > >
> > >      at
> > >
> > >
> >
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:530)
> > >
> > >      at
> > >
> > >
> >
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:476)
> > >
> > >      at
> > >
> > >
> >
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:471)
> > >
> > >      at
> > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> > >
> > >      at
> > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> > >
> > >      at scala.collection.immutable.Map$Map3.foreach(Map.scala:164)
> > >
> > >     at
> > > scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
> > >
> > >      at scala.collection.immutable.Map$Map3.map(Map.scala:144)
> > >
> > >      at
> > >
> > >
> >
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:471)
> > >
> > >      at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:437)
> > >
> > >      at kafka.server.KafkaApis.handle(KafkaApis.scala:186)
> > >
> > >      at
> > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
> > >
> > >      at java.lang.Thread.run(Thread.java:745)
> > >
> > >
> > > thx
> > >
> > > Jim
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>

Reply via email to