OK, so I recreated this, because I wasn't sure if the offsets were preserved 
when the data was evaporated, or if somehow they reset to zero:

        /opt/kafka/bin/kafka-topics.sh --zookeeper whatever --topic 
test.deletion --replication-factor 1 --partitions 1 --config 
"retention.ms=60000" --create

which makes a topic where messages are deleted after approximately 60 seconds.

   Then I published to the topic:

        /opt/kafka/bin/kafka-console-producer.sh --broker-list mybroker:9092 
--topic test.deletion
        type
        some
        stuff

   And I verified that I saw that stuff:

        /opt/kafka/bin/kafka-console-consumer.sh --zookeeper whatever --topic 
test.deletion --from-beginning

(and I did, I got three messages, with "type", "some", and "stuff", offsets 0, 
1, and 2, or so I remember).

   Then I used a python-kafka program to poke at the topic, since I'm about a 
thousand times more familiar with the python API than with the Java one.  (See 
below.  You can get the python API at https://github.com/mumrah/kafka-python.)

   Then I could use this to print stuff in the topic, or tell it to go to a 
specific offset.  And sure enough, if I knew that (say) offset 2 existed, then 
I let it age out, then I ran this with "-o 2", I'd get the expected 
offsetOutOfRangeException.

   But if I feed in data, the next offset is used, as one would expect: that 
is, if I'd fed in messages until the offset of the current message was 2, then 
I let the data age out, then I fed one more message, I could fetch the message 
with offset 3, no problem.  You could also see from the filenames of the log 
files for that topic that the most-recent offset is preserved -- not the 
message, of course, just the fact that the most-recent message was (say) #2.

   If this did reset the offset to 0, I'd call that a bug, but it doesn't, so 
this is reasonable behavior, even if it's a little surprising.  In addition to 
what Philip suggested (increase retention, consume sooner), you could try 
catching this specific exception and doing more or less what's suggested here, 
using the SimpleConsumer API:

        
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

(look at Finding Starting Offset For Reads).  It's interesting that the python 
API makes that quite a bit easier than the Java ones do, since the python API 
gives you the handy-dandy seek() method, which behaves more or less like the 
seek() syscalls in Unix.

   Note that that section even makes a passing reference to messages aging out, 
as it says "Don't assume that offset 0 is the beginning offset, since messages 
age out of the log over time."

   I hope this helps.

        -Steve

(snip)
#!/usr/bin/python

import os, sys
import time
import StringIO
import socket, struct, msgpack
import logging
# Should use argparse, but we shouldn't use python 2.6, either...
from optparse import OptionParser

import simplejson as json

from kafka.client import KafkaClient
from kafka.consumer import SimpleConsumer

#logging.basicConfig(level=logging.DEBUG)

def main():
    parser = OptionParser()
    parser.add_option('-t', '--topic', dest='toget',
      help='topic to which we should subscribe')
    parser.add_option('-p', '--partitions', dest='partlist',
      help='comma-separated list of partitions we should fetch from')
    parser.add_option('-b', '--broker', dest='kafkaHost',
      help='Kafka broker to which we should connect',
      default='mybroker')
    parser.add_option('-o', '--offset', dest='offset',
      help='offset of first message we should read')

    (options, args) = parser.parse_args()

    if options.toget:
        topic = options.toget
    else:
        topic = 'test.deletion'

    partitions = []
    if options.partlist is not None and options.partlist != '':
        parts = options.partlist.split(',')
        for part in parts:
            partitions.append(int(part))
    else:
        partitions.append(0)

    kafka = KafkaClient('%s:9092' % options.kafkaHost)

    print kafka.topic_partitions[topic]

    consumer = SimpleConsumer(client=kafka,
      group="wombat.%s" % socket.gethostname(),
      topic=topic, partitions=partitions,
      fetch_size_bytes = 1024 * 1024,
      auto_commit=False, buffer_size = 256 * 1024,
      max_buffer_size = 2048 * 1024)

    if options.offset:
        consumer.seek(int(options.offset), 0)
    else:
        consumer.seek(0, 0)

    while True:
        messages = consumer.get_messages(count=1000, block=True)
        #print '### got messages'
        for message in messages:
            print message.message.value
            print message

    kafka.close()

if __name__ == "__main__":
    main()


On Wed, Aug 20, 2014 at 09:28:12AM -0700, Philip O'Toole wrote:
> It's not a bug, right? It's the way the system works (if I have been 
> following the thread correctly) -- when the retention time passes, the 
> message is gone. Either consume your messages sooner, or increase your 
> retention time. Kafka is not magic, it can only do what it's told.
> 
> In practise I have found compression to be a big help -- big savings on disk 
> space.
> 
> 
> Philip
> 
> ?
> -----------------------------------------
> http://www.philipotoole.com 
> 
> 
> On Wednesday, August 20, 2014 1:42 AM, "pradeep.si...@wipro.com" 
> <pradeep.si...@wipro.com> wrote:
>  
> 
> 
> Sure, I would try with setting longer retention hours. But I feel this would 
> not be good approach? Should we raise it as a bug?
> 
> Thanks,
> Pradeep Simha
> Technical Lead
> 
> -----Original Message-----
> From: Manjunath Shivakumar [mailto:manjunath.shivaku...@betfair.com]
> Sent: Wednesday, August 20, 2014 1:31 PM
> To: users@kafka.apache.org
> Subject: RE: Keep on getting kafka.common.OffsetOutOfRangeException: Random 
> times
> 
> We had a similar issue in our dev environments, where we had to configure 
> aggressive log retention to save space.
> And the clients kept failing with this error, on Mondays, because the message 
> from friday had got deleted.
> Perhaps compaction would help in this scenario too?
> https://cwiki.apache.org/confluence/display/KAFKA/Log+Compaction
> 
> ________________________________________
> From: Steve Miller [st...@idrathernotsay.com]
> Sent: 20 August 2014 08:47
> To: users@kafka.apache.org
> Subject: Re: Keep on getting kafka.common.OffsetOutOfRangeException: Random 
> times
> 
> That seems likely.? I'd try either catching the exception and resetting the 
> offset, or upping log.retention.hours.? I'd be interested in hearing if that 
> fixes the problem.
> 
> ? ? -Steve
> 
> > On Aug 19, 2014, at 11:54 PM, <pradeep.si...@wipro.com> wrote:
> >
> > Thank you for your reply. Oh is retention hours have affect on this? I 
> > didn't knew this. I have log.retention.hours set to 1, and during 
> > development we test this once a 15 mins or hour or 2. So do you think this 
> > is causing the issue?
> >
> > Thanks,
> > Pradeep Simha
> > Technical Lead
> >
> > -----Original Message-----
> > From: Steve Miller [mailto:st...@idrathernotsay.com]
> > Sent: Tuesday, August 19, 2014 6:13 PM
> > To: users@kafka.apache.org
> > Subject: Re: Keep on getting kafka.common.OffsetOutOfRangeException:
> > Random times
> >
> >?  Also, what do you have log.retention.hours set to?? How often do you 
> >publish messages?
> >
> >?  I can envision a scenario in which you don't publish to a topic often, 
> >and in fact publish so infrequently that everything in the topic ages out 
> >from log.retention.hours first.
> >
> >?  I don't know exactly what happens should that occur, but I've seen some 
> >stuff that makes me think that the offsets might go back to zero -- or maybe 
> >they do if the broker restarts, so you might check to be sure that's not 
> >happening.
> >
> >?  From what I've seen in that regard, I've been wondering if part of the 
> >way most long-running Kafka consumers shouldn't be designed is to catch that 
> >exception and either set their offset to the first available message or the 
> >last available message, depending on whether their priority is to get every 
> >message or if it's to get the most recent messages.? Though in that scenario 
> >maybe it's that the first and last messages are the same by definition since 
> >there aren't any messages left in the topic. (-:
> >
> >?  It's also possible that the specific topic weirdness that my specific 
> >installation has been running into is causing that and it only happens for 
> >the stuff I work on, so definitely take this with a grain of salt, I'm no 
> >expert, just relating the local folklore.
> >
> >? ? -Steve
> >
> >> On Tue, Aug 19, 2014 at 09:12:30AM +0000, pradeep.si...@wipro.com wrote:
> >> Hi Team,
> >>
> >> Can someone please help me in this? This is really becoming road block to 
> >> our project we should decide whether to continue to use Kafka or some 
> >> other project as it is becoming? too much of unstable.
> >>
> >> Thanks,
> >> Pradeep Simha
> >> Technical Lead
> >>
> >> -----Original Message-----
> >> From: pradeep.si...@wipro.com [mailto:pradeep.si...@wipro.com]
> >> Sent: Tuesday, August 19, 2014 9:30 AM
> >> To: users@kafka.apache.org
> >> Subject: RE: Keep on getting kafka.common.OffsetOutOfRangeException:
> >> Random times
> >>
> >> Hi Neha,
> >>
> >> Yes, I am using the latest version ie (0.8.1.1).
> >>
> >> Hi Guozhang,
> >>
> >> These are the values:
> >>
> >> #log.retention.bytes= 1073741824 (Yes, this was commented by default)
> >>
> >> log.retention.check.interval.ms=60000
> >>
> >> Am I doing anything wrong here? Since I haven't touched this properties 
> >> file.
> >>
> >> Thanks,
> >> Pradeep Simha
> >> Technical Lead
> >>
> >> -----Original Message-----
> >> From: Neha Narkhede [mailto:neha.narkh...@gmail.com]
> >> Sent: Tuesday, August 19, 2014 2:27 AM
> >> To: users@kafka.apache.org
> >> Subject: Re: Keep on getting kafka.common.OffsetOutOfRangeException:
> >> Random times
> >>
> >> Also, what version of Kafka are you using? 0.8.1.1 is the latest most 
> >> stable version.
> >>
> >>
> >>> On Mon, Aug 18, 2014 at 9:36 AM, Guozhang Wang <wangg...@gmail.com> wrote:
> >>>
> >>> Hi Pradeep,
> >>>
> >>> It seems your logs gets deleted due to retention policy. Could you
> >>> check the config values for log.retention.bytes and
> >>> log.retention.check.interval.ms?
> >>>
> >>> http://kafka.apache.org/documentation.html#brokerconfigs
> >>>
> >>> Guozhang
> >>>
> >>>
> >>>> On Mon, Aug 18, 2014 at 5:49 AM, <pradeep.si...@wipro.com> wrote:
> >>>>
> >>>> Hi Team,
> >>>>
> >>>> Of late I am facing strange issue w.r.t Kafka. Random times I keep
> >>>> on getting these strange errors while consuming the topic:
> >>>>
> >>>>
> >>>> kafka.common.OffsetOutOfRangeException: Request for offset 19 but
> >>>> we only have log segments in the range 0 to 0.
> >>>> Sometimes I get like this:
> >>>>
> >>>>
> >>>> kafka.common.OffsetOutOfRangeException: Request for offset 19 but
> >>>> we only have log segments in the range 19 to 22.
> >>>>
> >>>> That number keeps on changing (with random ranges). I don't know
> >>>> what is the problem here. Both producer and consumer will work
> >>>> perfectly, but I keep on getting these errors randomly. In that
> >>>> situation if I clear the logs, remove the broker again it starts working 
> >>>> fine again.
> >>>>
> >>>> Can anyone please help me in this regard? This is affecting our
> >>>> application stability, if any more information required I can
> >>>> provide,
> >>> also
> >>>> we are using only the defaults provided by the kafka we didn't
> >>>> changed
> >>> any
> >>>> settings.
> >>>>
> >>>> Thanks,
> >>>> Pradeep Simha
> >>>> Technical Lead
> >>>>
> >>>> The information contained in this electronic message and any
> >>>> attachments to this message are intended for the exclusive use of
> >>>> the addressee(s)
> >>> and
> >>>> may contain proprietary, confidential or privileged information.
> >>>> If you
> >>> are
> >>>> not the intended recipient, you should not disseminate, distribute
> >>>> or
> >>> copy
> >>>> this e-mail. Please notify the sender immediately and destroy all
> >>>> copies
> >>> of
> >>>> this message and any attachments.
> >>>>
> >>>> WARNING: Computer viruses can be transmitted via email. The
> >>>> recipient should check this email and any attachments for the presence 
> >>>> of viruses.
> >>>> The company accepts no liability for any damage caused by any virus
> >>>> transmitted by this email.
> >>>>
> >>>> www.wipro.com
> >>>
> >>>
> >>>
> >>> --
> >>> -- Guozhang
> >>
> >> The information contained in this electronic message and any attachments 
> >> to this message are intended for the exclusive use of the addressee(s) and 
> >> may contain proprietary, confidential or privileged information. If you 
> >> are not the intended recipient, you should not disseminate, distribute or 
> >> copy this e-mail. Please notify the sender immediately and destroy all 
> >> copies of this message and any attachments.
> >>
> >> WARNING: Computer viruses can be transmitted via email. The recipient 
> >> should check this email and any attachments for the presence of viruses. 
> >> The company accepts no liability for any damage caused by any virus 
> >> transmitted by this email.
> >>
> >> www.wipro.com
> >>
> >> The information contained in this electronic message and any attachments 
> >> to this message are intended for the exclusive use of the addressee(s) and 
> >> may contain proprietary, confidential or privileged information. If you 
> >> are not the intended recipient, you should not disseminate, distribute or 
> >> copy this e-mail. Please notify the sender immediately and destroy all 
> >> copies of this message and any attachments.
> >>
> >> WARNING: Computer viruses can be transmitted via email. The recipient 
> >> should check this email and any attachments for the presence of viruses. 
> >> The company accepts no liability for any damage caused by any virus 
> >> transmitted by this email.
> >>
> >> www.wipro.com
> >
> > The information contained in this electronic message and any attachments to 
> > this message are intended for the exclusive use of the addressee(s) and may 
> > contain proprietary, confidential or privileged information. If you are not 
> > the intended recipient, you should not disseminate, distribute or copy this 
> > e-mail. Please notify the sender immediately and destroy all copies of this 
> > message and any attachments.
> >
> > WARNING: Computer viruses can be transmitted via email. The recipient 
> > should check this email and any attachments for the presence of viruses. 
> > The company accepts no liability for any damage caused by any virus 
> > transmitted by this email.
> >
> > www.wipro.com
> 
> ________________________________________________________________________
> In order to protect our email recipients, Betfair Group use SkyScan from 
> MessageLabs to scan all Incoming and Outgoing mail for viruses.
> 
> 
> ________________________________________________________________________
> 
> The information contained in this electronic message and any attachments to 
> this message are intended for the exclusive use of the addressee(s) and may 
> contain proprietary, confidential or privileged information. If you are not 
> the intended recipient, you should not disseminate, distribute or copy this 
> e-mail. Please notify the sender immediately and destroy all copies of this 
> message and any attachments.
> 
> WARNING: Computer viruses can be transmitted via email. The recipient should 
> check this email and any attachments for the presence of viruses. The company 
> accepts no liability for any damage caused by any virus transmitted by this 
> email.
> 
> www.wipro.com

Reply via email to