The log end offset is just the end of the committed messages in the log
(the last thing the consumer has access to). It isn't the same as the
cleaner point but is always later than it so it would work just as well.

-Jay

On Thu, Feb 19, 2015 at 8:54 AM, Will Funnell <w.f.funn...@gmail.com> wrote:

> > I'm not sure if I misunderstood Jay's suggestion, but I think it is
> > along the lines of: we expose the log-end-offset (actually the high
> > watermark) of the partition in the fetch response. However, this is
> > not exposed to the consumer (either in the new ConsumerRecord class
> > or the existing MessageAndMetadata class). If we did, then if you
> > were to consume a record you can check that it has offsets up to the
> > log-end offset. If it does then you would know for sure that you have
> > consumed everything for that partition
>
> To confirm then, the log-end-offset is the same as the cleaner point?
>
>
>
> On 19 February 2015 at 03:10, Jay Kreps <jay.kr...@gmail.com> wrote:
>
> > Yeah I was thinking either along the lines Joel was suggesting or else
> > adding a logEndOffset(TopicPartition) method or something like that. As
> > Joel says the consumer actually has this information internally (we
> return
> > it with the fetch request) but doesn't expose it.
> >
> > -Jay
> >
> > On Wed, Feb 18, 2015 at 4:51 PM, Joel Koshy <jjkosh...@gmail.com> wrote:
> >
> > > > > 2. Make the log end offset available more easily in the consumer.
> > > >
> > > > Was thinking something would need to be added in LogCleanerManager,
> in
> > > the
> > > > updateCheckpoints function. Where would be best to publish the
> > > information
> > > > to make it more easily available, or would you just expose the
> > > > offset-cleaner-checkpoint file as it is?
> > > > Is it right you would also need to know which
> offset-cleaner-checkpoint
> > > > entry related to each active partition?
> > >
> > > I'm not sure if I misunderstood Jay's suggestion, but I think it is
> > > along the lines of: we expose the log-end-offset (actually the high
> > > watermark) of the partition in the fetch response. However, this is
> > > not exposed to the consumer (either in the new ConsumerRecord class
> > > or the existing MessageAndMetadata class). If we did, then if you
> > > were to consume a record you can check that it has offsets up to the
> > > log-end offset. If it does then you would know for sure that you have
> > > consumed everything for that partition.
> > >
> > > > Yes, was looking at this initially, but as we have 100-150 writes per
> > > > second, it could be a while before there is a pause long enough to
> > check
> > > it
> > > > has caught up. Even with the consumer timeout set to -1, it takes
> some
> > > time
> > > > to query the max offset values, which is still long enough for more
> > > > messages to arrive.
> > >
> > > Got it - thanks for clarifying.
> > >
> > > >
> > > >
> > > >
> > > > On 18 February 2015 at 23:16, Joel Koshy <jjkosh...@gmail.com>
> wrote:
> > > >
> > > > > > You are also correct and perceptive to notice that if you check
> the
> > > end
> > > > > of
> > > > > > the log then begin consuming and read up to that point compaction
> > may
> > > > > have
> > > > > > already kicked in (if the reading takes a while) and hence you
> > might
> > > have
> > > > > > an incomplete snapshot.
> > > > >
> > > > > Isn't it sufficient to just repeat the check at the end after
> reading
> > > > > the log and repeat until you are truly done? At least for the
> > purposes
> > > > > of a snapshot?
> > > > >
> > > > > On Wed, Feb 18, 2015 at 02:21:49PM -0800, Jay Kreps wrote:
> > > > > > If you catch up off a compacted topic and keep consuming then you
> > > will
> > > > > > become consistent with the log.
> > > > > >
> > > > > > I think what you are saying is that you want to create a snapshot
> > > from
> > > > > the
> > > > > > Kafka topic but NOT do continual reads after that point. For
> > example
> > > you
> > > > > > might be creating a backup of the data to a file.
> > > > > >
> > > > > > I agree that this isn't as easy as it could be. As you say the
> only
> > > > > > solution we have is that timeout which doesn't differentiate
> > between
> > > GC
> > > > > > stall in your process and no more messages left so you would need
> > to
> > > tune
> > > > > > the timeout. This is admittedly kind of a hack.
> > > > > >
> > > > > > You are also correct and perceptive to notice that if you check
> the
> > > end
> > > > > of
> > > > > > the log then begin consuming and read up to that point compaction
> > may
> > > > > have
> > > > > > already kicked in (if the reading takes a while) and hence you
> > might
> > > have
> > > > > > an incomplete snapshot.
> > > > > >
> > > > > > I think there are two features we could add that would make this
> > > easier:
> > > > > > 1. Make the cleaner point configurable on a per-topic basis. This
> > > feature
> > > > > > would allow you to control how long the full log is retained and
> > when
> > > > > > compaction can kick in. This would give a configurable SLA for
> the
> > > reader
> > > > > > process to catch up.
> > > > > > 2. Make the log end offset available more easily in the consumer.
> > > > > >
> > > > > > -Jay
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Wed, Feb 18, 2015 at 10:18 AM, Will Funnell <
> > > w.f.funn...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > We are currently using Kafka 0.8.1.1 with log compaction in
> order
> > > to
> > > > > > > provide streams of messages to our clients.
> > > > > > >
> > > > > > > As well as constantly consuming the stream, one of our use
> cases
> > > is to
> > > > > > > provide a snapshot, meaning the user will receive a copy of
> every
> > > > > message
> > > > > > > at least once.
> > > > > > >
> > > > > > > Each one of these messages represents an item of content in our
> > > system.
> > > > > > >
> > > > > > >
> > > > > > > The problem comes when determining if the client has actually
> > > reached
> > > > > the
> > > > > > > end of the topic.
> > > > > > >
> > > > > > > The standard Kafka way of dealing with this seems to be by
> using
> > a
> > > > > > > ConsumerTimeoutException, but we are frequently getting this
> > error
> > > > > when the
> > > > > > > end of the topic has not been reached or even it may take a
> long
> > > time
> > > > > > > before a timeout naturally occurs.
> > > > > > >
> > > > > > >
> > > > > > > On first glance it would seem possible to do a lookup for the
> max
> > > > > offset
> > > > > > > for each partition when you begin consuming, stopping when this
> > > > > position it
> > > > > > > reached.
> > > > > > >
> > > > > > > But log compaction means that if an update to a piece of
> content
> > > > > arrives
> > > > > > > with the same message key, then this will be written to the end
> > so
> > > the
> > > > > > > snapshot will be incomplete.
> > > > > > >
> > > > > > >
> > > > > > > Another thought is to make use of the cleaner point. Currently
> > > Kafka
> > > > > writes
> > > > > > > out to a "cleaner-offset-checkpoint" file in each data
> directory
> > > which
> > > > > is
> > > > > > > written to after log compaction completes.
> > > > > > >
> > > > > > > If the consumer was able to access the
> cleaner-offset-checkpoint
> > > you
> > > > > would
> > > > > > > be able to consume up to this point, check the point was still
> > the
> > > > > same,
> > > > > > > and compaction had not yet occurred, and therefore determine
> you
> > > had
> > > > > > > receive everything at least once. (Assuming there was no race
> > > condition
> > > > > > > between compaction and writing to the file)
> > > > > > >
> > > > > > >
> > > > > > > Has anybody got any thoughts?
> > > > > > >
> > > > > > > Will
> > > > > > >
> > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > Will Funnell
> > >
> > >
> >
>
>
>
> --
> Will Funnell
>

Reply via email to