> > 2. Make the log end offset available more easily in the consumer. > > Was thinking something would need to be added in LogCleanerManager, in the > updateCheckpoints function. Where would be best to publish the information > to make it more easily available, or would you just expose the > offset-cleaner-checkpoint file as it is? > Is it right you would also need to know which offset-cleaner-checkpoint > entry related to each active partition?
I'm not sure if I misunderstood Jay's suggestion, but I think it is along the lines of: we expose the log-end-offset (actually the high watermark) of the partition in the fetch response. However, this is not exposed to the consumer (either in the new ConsumerRecord class or the existing MessageAndMetadata class). If we did, then if you were to consume a record you can check that it has offsets up to the log-end offset. If it does then you would know for sure that you have consumed everything for that partition. > Yes, was looking at this initially, but as we have 100-150 writes per > second, it could be a while before there is a pause long enough to check it > has caught up. Even with the consumer timeout set to -1, it takes some time > to query the max offset values, which is still long enough for more > messages to arrive. Got it - thanks for clarifying. > > > > On 18 February 2015 at 23:16, Joel Koshy <jjkosh...@gmail.com> wrote: > > > > You are also correct and perceptive to notice that if you check the end > > of > > > the log then begin consuming and read up to that point compaction may > > have > > > already kicked in (if the reading takes a while) and hence you might have > > > an incomplete snapshot. > > > > Isn't it sufficient to just repeat the check at the end after reading > > the log and repeat until you are truly done? At least for the purposes > > of a snapshot? > > > > On Wed, Feb 18, 2015 at 02:21:49PM -0800, Jay Kreps wrote: > > > If you catch up off a compacted topic and keep consuming then you will > > > become consistent with the log. > > > > > > I think what you are saying is that you want to create a snapshot from > > the > > > Kafka topic but NOT do continual reads after that point. For example you > > > might be creating a backup of the data to a file. > > > > > > I agree that this isn't as easy as it could be. As you say the only > > > solution we have is that timeout which doesn't differentiate between GC > > > stall in your process and no more messages left so you would need to tune > > > the timeout. This is admittedly kind of a hack. > > > > > > You are also correct and perceptive to notice that if you check the end > > of > > > the log then begin consuming and read up to that point compaction may > > have > > > already kicked in (if the reading takes a while) and hence you might have > > > an incomplete snapshot. > > > > > > I think there are two features we could add that would make this easier: > > > 1. Make the cleaner point configurable on a per-topic basis. This feature > > > would allow you to control how long the full log is retained and when > > > compaction can kick in. This would give a configurable SLA for the reader > > > process to catch up. > > > 2. Make the log end offset available more easily in the consumer. > > > > > > -Jay > > > > > > > > > > > > On Wed, Feb 18, 2015 at 10:18 AM, Will Funnell <w.f.funn...@gmail.com> > > > wrote: > > > > > > > We are currently using Kafka 0.8.1.1 with log compaction in order to > > > > provide streams of messages to our clients. > > > > > > > > As well as constantly consuming the stream, one of our use cases is to > > > > provide a snapshot, meaning the user will receive a copy of every > > message > > > > at least once. > > > > > > > > Each one of these messages represents an item of content in our system. > > > > > > > > > > > > The problem comes when determining if the client has actually reached > > the > > > > end of the topic. > > > > > > > > The standard Kafka way of dealing with this seems to be by using a > > > > ConsumerTimeoutException, but we are frequently getting this error > > when the > > > > end of the topic has not been reached or even it may take a long time > > > > before a timeout naturally occurs. > > > > > > > > > > > > On first glance it would seem possible to do a lookup for the max > > offset > > > > for each partition when you begin consuming, stopping when this > > position it > > > > reached. > > > > > > > > But log compaction means that if an update to a piece of content > > arrives > > > > with the same message key, then this will be written to the end so the > > > > snapshot will be incomplete. > > > > > > > > > > > > Another thought is to make use of the cleaner point. Currently Kafka > > writes > > > > out to a "cleaner-offset-checkpoint" file in each data directory which > > is > > > > written to after log compaction completes. > > > > > > > > If the consumer was able to access the cleaner-offset-checkpoint you > > would > > > > be able to consume up to this point, check the point was still the > > same, > > > > and compaction had not yet occurred, and therefore determine you had > > > > receive everything at least once. (Assuming there was no race condition > > > > between compaction and writing to the file) > > > > > > > > > > > > Has anybody got any thoughts? > > > > > > > > Will > > > > > > > > > > > -- > Will Funnell