Hi Dong,

Thanks for the follow up! I finally have much more clear understanding of
where you are coming from.

You are right. The success of findOffsets()/finding a point of
non-divergence depends on whether we have enough entries in the consumer's
leader epoch cache. However, I think this is a fundamental limitation of
having a leader epoch cache that does not persist across consumer restarts.

If we consider the general case where consumer may or may not have this
cache, then I see two paths:
1) Letting the user to track the leader epoch history externally, and have
more exposure to leader epoch and finding point of non-divergence in
KafkaConsumer API. I understand this is the case you were talking about.



On Tue, Jul 10, 2018 at 12:16 PM Dong Lin <lindon...@gmail.com> wrote:

> Hey Anna,
>
> Thanks much for your detailed explanation and example! It does help me
> understand the difference between our understanding.
>
> So it seems that the solution based on findOffsets() currently focuses
> mainly on the scenario that consumer has cached leaderEpoch -> offset
> mapping whereas I was thinking about the general case where consumer may or
> may not have this cache. I guess that is why we have different
> understanding here. I have some comments below.
>
>
> 3) The proposed solution using findOffsets(offset, leaderEpoch) followed by
> seek(offset) works if consumer has the cached leaderEpoch -> offset
> mapping. But if we assume consumer has this cache, do we need to have
> leaderEpoch in the findOffsets(...)? Intuitively, the findOffsets(offset)
> can also derive the leaderEpoch using offset just like the proposed
> solution does with seek(offset).
>
>
> 4) If consumer does not have cached leaderEpoch -> offset mapping, which is
> the case if consumer is restarted on a new machine, then it is not clear
> what leaderEpoch would be included in the FetchRequest if consumer does
> seek(offset). This is the case that motivates the first question of the
> previous email. In general, maybe we should discuss the final solution that
> covers all cases?
>
>
> 5) The second question in my previous email is related to the following
> paragraph:
>
> "... In some cases, offsets returned from position() could be actual
> consumed messages by this consumer identified by {offset, leader epoch}. In
> other cases, position() returns offset that was not actually consumed.
> Suppose, the user calls position() for the last offset...".
>
> I guess my point is that, if user calls position() for the last offset and
> uses that offset in seek(...), then user can probably just call
> Consumer#seekToEnd() without calling position() and seek(...). Similarly
> user can call Consumer#seekToBeginning() to the seek to the earliest
> position without calling position() and seek(...). Thus position() only
> needs to return the actual consumed messages identified by {offset, leader
> epoch}. Does this make sense?
>
>
> Thanks,
> Dong
>
>
> On Mon, Jul 9, 2018 at 6:47 PM, Anna Povzner <a...@confluent.io> wrote:
>
> > Hi Dong,
> >
> >
> > Thanks for considering my suggestions.
> >
> >
> > Based on your comments, I realized that my suggestion was not complete
> with
> > regard to KafkaConsumer API vs. consumer-broker protocol. While I propose
> > to keep KafkaConsumer#seek() unchanged and take offset only, the
> underlying
> > consumer will send the next FetchRequest() to broker with offset and
> > leaderEpoch if it is known (based on leader epoch cache in consumer) —
> note
> > that this is different from the current KIP, which suggests to always
> send
> > unknown leader epoch after seek(). This way, if the consumer and a broker
> > agreed on the point of non-divergence, which is some {offset,
> leaderEpoch}
> > pair, the new leader which causes another truncation (even further back)
> > will be able to detect new divergence and restart the process of finding
> > the new point of non-divergence. So, to answer your question, If the
> > truncation happens just after the user calls
> > KafkaConsumer#findOffsets(offset, leaderEpoch) followed by seek(offset),
> > the user will not seek to the wrong position without knowing that
> > truncation has happened, because the consumer will get another truncation
> > error, and seek again.
> >
> >
> > I am afraid, I did not understand your second question. Let me summarize
> my
> > suggestions again, and then give an example to hopefully make my
> > suggestions more clear. Also, the last part of my example shows how the
> > use-case in your first question will work. If it does not answer your
> > second question, would you mind clarifying? I am also focusing on the
> case
> > of a consumer having enough entries in the cache. The case of restarting
> > from committed offset either stored externally or internally will
> probably
> > need to be discussed more.
> >
> >
> > Let me summarize my suggestion again:
> >
> > 1) KafkaConsumer#seek() and KafkaConsumer#position() remains unchanged
> >
> > 2) New KafkaConsumer#findOffsets() takes {offset, leaderEpoch} pair per
> > topic partition and returns offset per topic partition.
> >
> > 3) FetchRequest() to broker after KafkaConsumer#seek() will contain the
> > offset set by seek and leaderEpoch that corresponds to the offset based
> on
> > leader epoch cache in the consumer.
> >
> >
> > The rest of this e-mail is a long and contrived example with several log
> > truncations and unclean leader elections to illustrate the API and your
> > first use-case. Suppose we have three brokers. Initially, Broker A, B,
> and
> > C has one message at offset 0 with leader epoch 0. Then, Broker A goes
> down
> > for some time. Broker B becomes a leader with epoch 1, and writes
> messages
> > to offsets 1 and 2. Broker C fetches offset 1, but before fetching offset
> > 2, becomes a leader with leader epoch 2 and writes a message at offset 2.
> > Here is the state of brokers at this point:
> >
> > > Broker A:
> > > offset 0, epoch 0 <— leader
> > > goes down…
> >
> >
> > > Broker B:
> > > offset 0, epoch 0
> > > offset 1, epoch 1  <- leader
> > > offset 2, epoch 1
> >
> >
> >
> > Broker C:
> > > offset 0, epoch 0
> > > offset 1, epoch 1
> > > offset 2, epoch 2 <— leader
> >
> >
> > Before Broker C becomes a leader with leader epoch 2, the consumer
> consumed
> > the following messages from broker A and broker B:
> >
> > {offset=0, leaderEpoch=0}, {offset=1, leaderEpoch=1}, {offset=2,
> > leaderEpoch=1}.
> >
> > Consumer’s leader epoch cache at this point contains the following
> entries:
> >
> > (leaderEpoch=0, startOffset=0)
> >
> > (leaderEpoch=1, startOffset=1)
> >
> > endOffset = 3
> >
> >
> > Then, broker B becomes the follower of broker C, truncates and starts
> > fetching from offset 2.
> >
> > Consumer sends fetchRequest(offset=3, leaderEpoch=1) and gets
> > LOG_TRUNCATION
> > error from broker C.
> >
> > In response, the client calls KafkaConsumer#findOffsets(offset=3,
> > leaderEpoch=1). The underlying consumer sends
> > OffsetsForLeaderEpoch(leaderEpoch=1), broker C responds with
> > {leaderEpoch=1, endOffset=2}. So, KafkaConsumer#findOffsets(offset=3,
> > leaderEpoch=1) returns offset=2.
> >
> > In response, consumer calls KafkaConsumer@seek(offset=2) followed by
> > poll(), which results in FetchRequest(offset=2, leaderEpoch=1) to broker
> C.
> >
> >
> > I will continue with this example with the goal to answer your first
> > question about truncation just after findOffsets() followed by seek():
> >
> > Suppose, brokers B and C go down, and broker A comes up and becomes a
> > leader with leader epoch 3, and writes a message to offset 1. Suppose,
> this
> > happens before the consumer gets response from broker C to the previous
> > fetch request:  FetchRequest(offset=2, leaderEpoch=1).
> >
> > Consumer re-sends FetchRequest(offset=2, leaderEpoch=1) to broker A,
> which
> > returns LOG_TRUNCATION error, because broker A has leader epoch 3 >
> leader
> > epoch in FetchRequest with starting offset = 1 < offset 2 in
> > FetchRequest().
> >
> > In response, the user calls KafkaConsumer#findOffsets(offset=2,
> > leaderEpoch=1). The underlying consumer sends
> > OffsetsForLeaderEpoch(leaderEpoch=1), broker A responds with
> > {leaderEpoch=0, endOffset=1}; the underlying consumer finds leaderEpoch
> = 0
> > in its cache with end offset == 1, which results in
> > KafkaConsumer#findOffsets(offset=2, leaderEpoch=1) returning offset = 1.
> >
> > In response, the user calls KafkaConsumer@seek(offset=1) followed by
> > poll(), which results in FetchRequest(offset=1, leaderEpoch=0) to broker
> A,
> > which responds with message at offset 1, leader epoch 3.
> >
> >
> > I will think some more about consumers restarting from committed offsets,
> > and send a follow up.
> >
> >
> > Thanks,
> >
> > Anna
> >
> >
> > On Sat, Jul 7, 2018 at 1:36 AM Dong Lin <lindon...@gmail.com> wrote:
> >
> > > Hey Anna,
> > >
> > > Thanks much for the thoughtful reply. It makes sense to different
> between
> > > "seeking to a message" and "seeking to a position". I have to questions
> > > here:
> > >
> > > - For "seeking to a message" use-case, with the proposed approach user
> > > needs to call findOffset(offset, leaderEpoch) followed by seek(offset).
> > If
> > > message truncation and message append happen immediately after
> > > findOffset(offset,
> > > leaderEpoch) but before seek(offset), it seems that user will seek to
> the
> > > wrong message without knowing the truncation has happened. Would this
> be
> > a
> > > problem?
> > >
> > > - For "seeking to a position" use-case, it seems that there can be two
> > > positions, i.e. earliest and latest. So these two cases can be
> > > Consumer.fulfilled by seekToBeginning() and Consumer.seekToEnd(). Then
> it
> > > seems that user will only need to call position() and seek() for
> "seeking
> > > to a message" use-case?
> > >
> > > Thanks,
> > > Dong
> > >
> > >
> > > On Wed, Jul 4, 2018 at 12:33 PM, Anna Povzner <a...@confluent.io>
> wrote:
> > >
> > > > Hi Jason and Dong,
> > > >
> > > >
> > > > I’ve been thinking about your suggestions and discussion regarding
> > > > position(), seek(), and new proposed API.
> > > >
> > > >
> > > > Here is my thought process why we should keep position() and seek()
> API
> > > > unchanged.
> > > >
> > > >
> > > > I think we should separate {offset, leader epoch} that uniquely
> > > identifies
> > > > a message from an offset that is a position. In some cases, offsets
> > > > returned from position() could be actual consumed messages by this
> > > consumer
> > > > identified by {offset, leader epoch}. In other cases, position()
> > returns
> > > > offset that was not actually consumed. Suppose, the user calls
> > position()
> > > > for the last offset. Suppose we return {offset, leader epoch} of the
> > > > message currently in the log. Then, the message gets truncated before
> > > > consumer’s first poll(). It does not make sense for poll() to fail in
> > > this
> > > > case, because the log truncation did not actually happen from the
> > > consumer
> > > > perspective. On the other hand, as the KIP proposes, it makes sense
> for
> > > the
> > > > committed() method to return {offset, leader epoch} because those
> > offsets
> > > > represent actual consumed messages.
> > > >
> > > >
> > > > The same argument applies to the seek() method — we are not seeking
> to
> > a
> > > > message, we are seeking to a position.
> > > >
> > > >
> > > > I like the proposal to add KafkaConsumer#findOffsets() API. I am
> > assuming
> > > > something like:
> > > >
> > > > Map<TopicPartition, Long> findOffsets(Map<TopicPartition,
> > OffsetAndEpoch>
> > > > offsetsToSearch)
> > > >
> > > > Similar to seek() and position(), I think findOffsets() should return
> > > > offset without leader epoch, because what we want is the offset that
> we
> > > > think is closest to the not divergent message from the given consumed
> > > > message. Until the consumer actually fetches the message, we should
> not
> > > let
> > > > the consumer store the leader epoch for a message it did not consume.
> > > >
> > > >
> > > > So, the workflow will be:
> > > >
> > > > 1) The user gets LogTruncationException with {offset, leader epoch of
> > the
> > > > previous message} (whatever we send with new FetchRecords request).
> > > >
> > > > 2) offset = findOffsets(tp -> {offset, leader epoch})
> > > >
> > > > 3) seek(offset)
> > > >
> > > >
> > > > For the use-case where the users store committed offsets externally:
> > > >
> > > > 1) Such users would have to track the leader epoch together with an
> > > offset.
> > > > Otherwise, there is no way to detect later what leader epoch was
> > > associated
> > > > with the message. I think it’s reasonable to ask that from users if
> > they
> > > > want to detect log truncation. Otherwise, they will get the current
> > > > behavior.
> > > >
> > > >
> > > > If the users currently get an offset to be stored using position(), I
> > see
> > > > two possibilities. First, they call save offset returned from
> > position()
> > > > that they call before poll(). In that case, it would not be correct
> to
> > > > store {offset, leader epoch} if we would have changed position() to
> > > return
> > > > {offset, leader epoch} since actual fetched message could be
> different
> > > > (from the example I described earlier). So, it would be more correct
> to
> > > > call position() after poll(). However, the user already gets
> > > > ConsumerRecords at this point, from which the user can extract
> {offset,
> > > > leader epoch} of the last message.
> > > >
> > > >
> > > > So, I like the idea of adding a helper method to ConsumerRecords, as
> > > Jason
> > > > proposed, something like:
> > > >
> > > > public OffsetAndEpoch lastOffsetWithLeaderEpoch(), where
> OffsetAndEpoch
> > > is
> > > > a data struct holding {offset, leader epoch}.
> > > >
> > > >
> > > > In this case, we would advise the user to follow the workflow:
> poll(),
> > > get
> > > > {offset, leader epoch} from ConsumerRecords#lastOffsetWith
> > LeaderEpoch(),
> > > > save offset and leader epoch, process records.
> > > >
> > > >
> > > > 2) When the user needs to seek to the last committed offset, they
> call
> > > new
> > > > findOffsets(saved offset, leader epoch), and then seek(offset).
> > > >
> > > >
> > > > What do you think?
> > > >
> > > >
> > > > Thanks,
> > > >
> > > > Anna
> > > >
> > > >
> > > > On Tue, Jul 3, 2018 at 4:06 PM Dong Lin <lindon...@gmail.com> wrote:
> > > >
> > > > > Hey Jason,
> > > > >
> > > > > Thanks much for your thoughtful explanation.
> > > > >
> > > > > Yes the solution using findOffsets(offset, leaderEpoch) also works.
> > The
> > > > > advantage of this solution it adds only one API instead of two
> APIs.
> > > The
> > > > > concern is that its usage seems a bit more clumsy for advanced
> users.
> > > > More
> > > > > specifically, advanced users who store offsets externally will
> always
> > > > need
> > > > > to call findOffsets() before calling seek(offset) during consumer
> > > > > initialization. And those advanced users will need to manually keep
> > > track
> > > > > of the leaderEpoch of the last ConsumerRecord.
> > > > >
> > > > > The other solution may be more user-friendly for advanced users is
> to
> > > add
> > > > > two APIs, `void seek(offset, leaderEpoch)` and `(offset, epoch) =
> > > > > offsetEpochs(topicPartition)`.
> > > > >
> > > > > I kind of prefer the second solution because it is easier to use
> for
> > > > > advanced users. If we need to expose leaderEpoch anyway to safely
> > > > identify
> > > > > a message, it may be conceptually simpler to expose it directly in
> > > > > seek(...) rather than requiring one more translation using
> > > > > findOffsets(...). But I am also OK with the first solution if other
> > > > > developers also favor that one :)
> > > > >
> > > > > Thanks,
> > > > > Dong
> > > > >
> > > > >
> > > > > On Mon, Jul 2, 2018 at 11:10 AM, Jason Gustafson <
> ja...@confluent.io
> > >
> > > > > wrote:
> > > > >
> > > > > > Hi Dong,
> > > > > >
> > > > > > Thanks, I've been thinking about your suggestions a bit. It is
> > > > > challenging
> > > > > > to make this work given the current APIs. One of the difficulties
> > is
> > > > that
> > > > > > we don't have an API to find the leader epoch for a given offset
> at
> > > the
> > > > > > moment. So if the user does a seek to offset 5, then we'll need a
> > new
> > > > API
> > > > > > to find the corresponding epoch in order to fulfill the new
> > > position()
> > > > > API.
> > > > > > Potentially we could modify ListOffsets to enable finding the
> > leader
> > > > > epoch,
> > > > > > but I am not sure it is worthwhile. Perhaps it is reasonable for
> > > > advanced
> > > > > > usage to expect that the epoch information, if needed, will be
> > > > extracted
> > > > > > from the records directly? It might make sense to expose a helper
> > in
> > > > > > `ConsumerRecords` to make this a little easier though.
> > > > > >
> > > > > > Alternatively, if we think it is important to have this
> information
> > > > > exposed
> > > > > > directly, we could create batch APIs to solve the naming problem.
> > For
> > > > > > example:
> > > > > >
> > > > > > Map<TopicPartition, OffsetAndEpoch> positions();
> > > > > > void seek(Map<TopicPartition, OffsetAndEpoch> positions);
> > > > > >
> > > > > > However, I'm actually leaning toward leaving the seek() and
> > > position()
> > > > > APIs
> > > > > > unchanged. Instead, we can add a new API to search for offset by
> > > > > timestamp
> > > > > > or by offset/leader epoch. Let's say we call it `findOffsets`. If
> > the
> > > > > user
> > > > > > hits a log truncation error, they can use this API to find the
> > > closest
> > > > > > offset and then do a seek(). At the same time, we deprecate the
> > > > > > `offsetsForTimes` APIs. We now have two use cases which require
> > > finding
> > > > > > offsets, so I think we should make this API general and leave the
> > > door
> > > > > open
> > > > > > for future extensions.
> > > > > >
> > > > > > By the way, I'm unclear about the desire to move part of this
> > > > > functionality
> > > > > > to AdminClient. Guozhang suggested this previously, but I think
> it
> > > only
> > > > > > makes sense for cross-cutting capabilities such as topic
> creation.
> > If
> > > > we
> > > > > > have an API which is primarily useful by consumers, then I think
> > > that's
> > > > > > where it should be exposed. The AdminClient also has its own API
> > > > > integrity
> > > > > > and should not become a dumping ground for advanced use cases.
> I'll
> > > > > update
> > > > > > the KIP with the  `findOffsets` API suggested above and we can
> see
> > if
> > > > it
> > > > > > does a good enough job of keeping the API simple for common
> cases.
> > > > > >
> > > > > > Thanks,
> > > > > > Jason
> > > > > >
> > > > > >
> > > > > > On Sat, Jun 30, 2018 at 4:39 AM, Dong Lin <lindon...@gmail.com>
> > > wrote:
> > > > > >
> > > > > > > Hey Jason,
> > > > > > >
> > > > > > > Regarding seek(...), it seems that we want an API for user to
> > > > > initialize
> > > > > > > consumer with (offset, leaderEpoch) and that API should allow
> > > > throwing
> > > > > > > PartitionTruncationException. Suppose we agree on this, then
> > > > > > > seekToNearest() is not sufficient because it will always
> swallow
> > > > > > > PartitionTruncationException. Here we have two options. The
> first
> > > > > option
> > > > > > is
> > > > > > > to add API offsetsForLeaderEpochs() to translate (leaderEpoch,
> > > > offset)
> > > > > to
> > > > > > > offset. The second option is to have add seek(offset,
> > leaderEpoch).
> > > > It
> > > > > > > seems that second option may be more simpler because it makes
> it
> > > > clear
> > > > > > that
> > > > > > > (offset, leaderEpoch) will be used to identify consumer's
> > position
> > > > in a
> > > > > > > partition. And user only needs to handle
> > > PartitionTruncationException
> > > > > > from
> > > > > > > the poll(). In comparison the first option seems a bit harder
> to
> > > use
> > > > > > > because user have to also handle the
> PartitionTruncationException
> > > if
> > > > > > > offsetsForLeaderEpochs() returns different offset from
> > > user-provided
> > > > > > > offset. What do you think?
> > > > > > >
> > > > > > > If we decide to add API seek(offset, leaderEpoch), then we can
> > > decide
> > > > > > > whether and how to add API to translate (offset, leaderEpoch)
> to
> > > > > offset.
> > > > > > It
> > > > > > > seems that this API will be needed by advanced user to don't
> want
> > > > auto
> > > > > > > offset reset (so that it can be notified) but still wants to
> > reset
> > > > > offset
> > > > > > > to closest. For those users if probably makes sense to only
> have
> > > the
> > > > > API
> > > > > > in
> > > > > > > AdminClient. offsetsForTimes() seems like a common API that
> will
> > be
> > > > > > needed
> > > > > > > by user's of consumer in general, so it may be more reasonable
> to
> > > > stay
> > > > > in
> > > > > > > the consumer API. I don't have a strong opinion on whether
> > > > > > > offsetsForTimes() should be replaced by API in AdminClient.
> > > > > > >
> > > > > > > Though (offset, leaderEpoch) is needed to uniquely identify a
> > > message
> > > > > in
> > > > > > > general, it is only needed for advanced users who has turned on
> > > > unclean
> > > > > > > leader election, need to use seek(..), and don't want auto
> offset
> > > > > reset.
> > > > > > > Most other users probably just want to enable auto offset reset
> > and
> > > > > store
> > > > > > > offset in Kafka. Thus we might want to keep the existing
> > > offset-only
> > > > > APIs
> > > > > > > (e.g. seek() and position()) for most users while adding new
> APIs
> > > for
> > > > > > > advanced users. And yes, it seems that we need new name for
> > > > position().
> > > > > > >
> > > > > > > Though I think we need new APIs to carry the new information
> > (e.g.
> > > > > > > leaderEpoch), I am not very sure how that should look like. One
> > > > > possible
> > > > > > > option is those APIs in KIP-232. Another option is something
> like
> > > > this:
> > > > > > >
> > > > > > > `````
> > > > > > > class OffsetEpochs {
> > > > > > >   long offset;
> > > > > > >   int leaderEpoch;
> > > > > > >   int partitionEpoch;   // This may be needed later as
> discussed
> > in
> > > > > > KIP-232
> > > > > > >   ... // Hopefully these are all we need to identify message in
> > > > Kafka.
> > > > > > But
> > > > > > > if we need more then we can add new fields in this class.
> > > > > > > }
> > > > > > >
> > > > > > > OffsetEpochs offsetEpochs(TopicPartition);
> > > > > > >
> > > > > > > void seek(TopicPartition, OffsetEpochs);
> > > > > > > ``````
> > > > > > >
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Dong
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Jun 29, 2018 at 11:13 AM, Jason Gustafson <
> > > > ja...@confluent.io>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hey Dong,
> > > > > > > >
> > > > > > > > Thanks for the feedback. The first three points are easy:
> > > > > > > >
> > > > > > > > 1. Yes, we should be consistent.
> > > > > > > > 2. Yes, I will add this.
> > > > > > > > 3. Yes, I think we should document the changes to the
> committed
> > > > > offset
> > > > > > > > schema. I meant to do this, but it slipped my mind.
> > > > > > > >
> > > > > > > > The latter questions are tougher. One option I was
> considering
> > is
> > > > to
> > > > > > have
> > > > > > > > only `offsetsForLeaderEpochs` exposed from the consumer and
> to
> > > drop
> > > > > the
> > > > > > > new
> > > > > > > > seek() API. That seems more consistent with the current use
> of
> > > > > > > > `offsetsForTimes` (we don't have a separate `seekToTimestamp`
> > > API).
> > > > > An
> > > > > > > > alternative might be to take a page from the AdminClient API
> > and
> > > > add
> > > > > a
> > > > > > > new
> > > > > > > > method to generalize offset lookup. For example, we could
> have
> > > > > > > > `lookupOffsets(LookupOptions)`. We could then deprecate
> > > > > > > `offsetsForTimes`
> > > > > > > > and this would open the door for future extensions without
> > > needing
> > > > > new
> > > > > > > > APIs.
> > > > > > > >
> > > > > > > > The case of position() is a little more annoying. It would
> have
> > > > been
> > > > > > > better
> > > > > > > > had we let this return an object so that it is easier to
> > extend.
> > > > This
> > > > > > is
> > > > > > > > the only reason I didn't add the API to the KIP. Maybe we
> > should
> > > > bite
> > > > > > the
> > > > > > > > bullet and fix this now? Unfortunately we'll have to come up
> > > with a
> > > > > new
> > > > > > > > name. Maybe `currentPosition`?
> > > > > > > >
> > > > > > > > Thoughts?
> > > > > > > >
> > > > > > > > -Jason
> > > > > > > >
> > > > > > > >
> > > > > > > > On Fri, Jun 29, 2018 at 10:40 AM, Dong Lin <
> > lindon...@gmail.com>
> > > > > > wrote:
> > > > > > > >
> > > > > > > > > Regarding points 4) and 5) above, motivation for the
> > > alternative
> > > > > APIs
> > > > > > > is
> > > > > > > > > that, if we decide that leaderEpoch is equally important as
> > > > offset
> > > > > in
> > > > > > > > > identifying a message, then it may be reasonable to always
> > > > specify
> > > > > it
> > > > > > > > > wherever offset is currently required in the consumer API
> to
> > > > > > identify a
> > > > > > > > > message, e.g. position(), seek(). For example, since we
> allow
> > > > user
> > > > > to
> > > > > > > > > retrieve offset using position() instead of asking user to
> > keep
> > > > > track
> > > > > > > of
> > > > > > > > > the offset of the latest ConsumerRecord, may be it will be
> > more
> > > > > > > > consistent
> > > > > > > > > for user to also retrieve  leaderEpoch using position()?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Fri, Jun 29, 2018 at 10:30 AM, Dong Lin <
> > > lindon...@gmail.com>
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hey Jason,
> > > > > > > > > >
> > > > > > > > > > Thanks for the update. It looks pretty good. Just some
> > minor
> > > > > > comments
> > > > > > > > > > below:
> > > > > > > > > >
> > > > > > > > > > 1) The KIP adds new error code "LOG_TRUNCATION" and new
> > > > exception
> > > > > > > > > TruncatedPartitionException.
> > > > > > > > > > Can we make the name more consistent, e.g.
> > > > > LogTruncationException?
> > > > > > > > > >
> > > > > > > > > > 2) Do we need to add UnknownLeaderEpochException as part
> of
> > > API
> > > > > > > change?
> > > > > > > > > >
> > > > > > > > > > 3) Not sure if the offset topic schema is also public
> API.
> > If
> > > > so,
> > > > > > > maybe
> > > > > > > > > we
> > > > > > > > > > should also include the schema change in the API?
> > > > > > > > > >
> > > > > > > > > > 4) For users who store offset externally, currently they
> > get
> > > > > offset
> > > > > > > > using
> > > > > > > > > > position(..), store the offset externally, and use
> seek(..)
> > > to
> > > > > > > > initialize
> > > > > > > > > > the consumer next time. After this KIP they will need to
> > > store
> > > > > and
> > > > > > > use
> > > > > > > > > the
> > > > > > > > > > leaderEpoch together with the offset. Should we also
> update
> > > the
> > > > > API
> > > > > > > so
> > > > > > > > > that
> > > > > > > > > > user can also get leaderEpoch from position(...)? Not
> sure
> > if
> > > > it
> > > > > is
> > > > > > > OK
> > > > > > > > to
> > > > > > > > > > ask user to track the latest leaderEpoch of
> ConsumerRecord
> > by
> > > > > > > > themselves.
> > > > > > > > > >
> > > > > > > > > > 5) Also for users who store offset externally, they need
> to
> > > > call
> > > > > > > > seek(..)
> > > > > > > > > > with leaderEpoch to initialize consumer. With current KIP
> > > users
> > > > > > need
> > > > > > > to
> > > > > > > > > > call seekToNearest(), whose name suggests that the final
> > > > position
> > > > > > may
> > > > > > > > be
> > > > > > > > > > different from what was requested. However, if users may
> > want
> > > > to
> > > > > > > avoid
> > > > > > > > > auto
> > > > > > > > > > offset reset and be notified explicitly when there is log
> > > > > > truncation,
> > > > > > > > > then seekToNearest()
> > > > > > > > > > probably does not help here. Would it make sense to
> replace
> > > > > > > > > seekToNearest()
> > > > > > > > > > with seek(offset, leaderEpoch) + AminClient.
> > > > > > > > offsetsForLeaderEpochs(...)?
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > Dong
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Wed, Jun 27, 2018 at 3:57 PM, Jason Gustafson <
> > > > > > ja...@confluent.io
> > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > >> Hey Guozhang,
> > > > > > > > > >>
> > > > > > > > > >> That's fair. In fact, perhaps we do not need this API at
> > > all.
> > > > We
> > > > > > > > already
> > > > > > > > > >> have the new seek() in this KIP which can do the lookup
> > > based
> > > > on
> > > > > > > epoch
> > > > > > > > > for
> > > > > > > > > >> this use case. I guess we should probably call it
> > > > > seekToNearest()
> > > > > > > > though
> > > > > > > > > >> to
> > > > > > > > > >> make it clear that the final position may be different
> > from
> > > > what
> > > > > > was
> > > > > > > > > >> requested.
> > > > > > > > > >>
> > > > > > > > > >> Thanks,
> > > > > > > > > >> Jason
> > > > > > > > > >>
> > > > > > > > > >> On Wed, Jun 27, 2018 at 3:20 PM, Guozhang Wang <
> > > > > > wangg...@gmail.com>
> > > > > > > > > >> wrote:
> > > > > > > > > >>
> > > > > > > > > >> > Hi Jason,
> > > > > > > > > >> >
> > > > > > > > > >> > I think it is less worthwhile to add
> > > > > > > KafkaConsumer#offsetsForLeader
> > > > > > > > > >> Epochs,
> > > > > > > > > >> > since probably only very advanced users are aware of
> the
> > > > > > > > leaderEpoch,
> > > > > > > > > >> and
> > > > > > > > > >> > hence ever care to use it anyways. It is more like an
> > > admin
> > > > > > client
> > > > > > > > > >> > operation than a consumer client operation: if the
> > > > motivation
> > > > > is
> > > > > > > to
> > > > > > > > > >> > facility customized reset policy, maybe adding it as
> > > > > > > > > >> > AdminClient#offsetsForLeaderEpochs
> > > > > > > > > >> > is better as it is not an aggressive assumption that
> for
> > > > such
> > > > > > > > advanced
> > > > > > > > > >> > users they are willing to use some admin client to get
> > > > further
> > > > > > > > > >> information?
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > Guozhang
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > On Wed, Jun 27, 2018 at 2:07 PM, Jason Gustafson <
> > > > > > > > ja...@confluent.io>
> > > > > > > > > >> > wrote:
> > > > > > > > > >> >
> > > > > > > > > >> > > Thanks for the feedback. I've updated the KIP.
> > > > Specifically
> > > > > I
> > > > > > > > > removed
> > > > > > > > > >> the
> > > > > > > > > >> > > "closest" reset option and the proposal to reset by
> > > > > timestamp
> > > > > > > when
> > > > > > > > > the
> > > > > > > > > >> > > precise truncation point cannot be determined.
> > Instead,
> > > I
> > > > > > > proposed
> > > > > > > > > >> that
> > > > > > > > > >> > we
> > > > > > > > > >> > > always reset using the nearest epoch when a reset
> > policy
> > > > is
> > > > > > > > defined
> > > > > > > > > >> > (either
> > > > > > > > > >> > > "earliest" or "latest"). Does that sound reasonable?
> > > > > > > > > >> > >
> > > > > > > > > >> > > One thing I am still debating is whether it would be
> > > > better
> > > > > to
> > > > > > > > have
> > > > > > > > > a
> > > > > > > > > >> > > separate API to find the closest offset using the
> > leader
> > > > > > epoch.
> > > > > > > In
> > > > > > > > > the
> > > > > > > > > >> > > current KIP, I suggested to piggyback this
> information
> > > on
> > > > an
> > > > > > > > > >> exception,
> > > > > > > > > >> > but
> > > > > > > > > >> > > I'm beginning to think it would be better not to
> hide
> > > the
> > > > > > > lookup.
> > > > > > > > It
> > > > > > > > > >> is
> > > > > > > > > >> > > awkward to implement since it means delaying the
> > > exception
> > > > > and
> > > > > > > the
> > > > > > > > > API
> > > > > > > > > >> > may
> > > > > > > > > >> > > actually be useful when customizing reset logic if
> no
> > > auto
> > > > > > reset
> > > > > > > > > >> policy
> > > > > > > > > >> > is
> > > > > > > > > >> > > defined. I was thinking we can add an API like the
> > > > > following:
> > > > > > > > > >> > >
> > > > > > > > > >> > > Map<TopicPartition, OffsetAndEpoch>
> > > > > > > > > >> > > offsetsForLeaderEpochs(Map<TopicPartition, Integer>
> > > > > > > > epochsToSearch)
> > > > > > > > > >> > >
> > > > > > > > > >> > > Thoughts?
> > > > > > > > > >> > >
> > > > > > > > > >> > > -Jason
> > > > > > > > > >> > >
> > > > > > > > > >> > >
> > > > > > > > > >> > >
> > > > > > > > > >> > >
> > > > > > > > > >> > > On Wed, Jun 27, 2018 at 11:12 AM, Jason Gustafson <
> > > > > > > > > ja...@confluent.io
> > > > > > > > > >> >
> > > > > > > > > >> > > wrote:
> > > > > > > > > >> > >
> > > > > > > > > >> > > > @Dong
> > > > > > > > > >> > > >
> > > > > > > > > >> > > > Those are fair points. Both approaches require
> some
> > > > > > fuzziness
> > > > > > > to
> > > > > > > > > >> reset
> > > > > > > > > >> > > the
> > > > > > > > > >> > > > offset in these pathological scenarios and we
> cannot
> > > > > > guarantee
> > > > > > > > > >> > > > at-least-once delivery either way unless we have
> the
> > > > full
> > > > > > > > history
> > > > > > > > > of
> > > > > > > > > >> > > leader
> > > > > > > > > >> > > > epochs that were consumed. The KIP-101 logic may
> > > > actually
> > > > > be
> > > > > > > > more
> > > > > > > > > >> > > accurate
> > > > > > > > > >> > > > than using timestamps because it does not depend
> on
> > > the
> > > > > > > messages
> > > > > > > > > >> which
> > > > > > > > > >> > > are
> > > > > > > > > >> > > > written after the unclean leader election. The
> case
> > > > we're
> > > > > > > > talking
> > > > > > > > > >> about
> > > > > > > > > >> > > > should be extremely rare in practice anyway. I
> also
> > > > agree
> > > > > > that
> > > > > > > > we
> > > > > > > > > >> may
> > > > > > > > > >> > not
> > > > > > > > > >> > > > want to add new machinery if it only helps the old
> > > > message
> > > > > > > > format.
> > > > > > > > > >> Ok,
> > > > > > > > > >> > > > let's go ahead and drop the timestamp.
> > > > > > > > > >> > > >
> > > > > > > > > >> > > > @Guozhang
> > > > > > > > > >> > > >
> > > > > > > > > >> > > > * My current understanding is that, with unclean
> > > leader
> > > > > > > election
> > > > > > > > > >> turned
> > > > > > > > > >> > > on,
> > > > > > > > > >> > > >> exactly-once is out of the window since we cannot
> > > > > guarantee
> > > > > > > > that
> > > > > > > > > >> all
> > > > > > > > > >> > > >> committed message markers will not be lost. And
> > hence
> > > > > there
> > > > > > > is
> > > > > > > > no
> > > > > > > > > >> need
> > > > > > > > > >> > > to
> > > > > > > > > >> > > >> have special handling logic for LOG_TRUNCATED or
> > OOR
> > > > > error
> > > > > > > > codes
> > > > > > > > > >> with
> > > > > > > > > >> > > >> read.committed turned on. Is that right?
> > > > > > > > > >> > > >
> > > > > > > > > >> > > >
> > > > > > > > > >> > > > Yes, that's right. EoS and unclean leader election
> > > don't
> > > > > mix
> > > > > > > > well.
> > > > > > > > > >> It
> > > > > > > > > >> > may
> > > > > > > > > >> > > > be worth considering separately whether we should
> > try
> > > to
> > > > > > > > reconcile
> > > > > > > > > >> the
> > > > > > > > > >> > > > transaction log following an unclean leader
> > election.
> > > At
> > > > > > least
> > > > > > > > we
> > > > > > > > > >> may
> > > > > > > > > >> > be
> > > > > > > > > >> > > > able to prevent dangling transactions from
> blocking
> > > > > > consumers.
> > > > > > > > > This
> > > > > > > > > >> KIP
> > > > > > > > > >> > > > does not address this problem.
> > > > > > > > > >> > > >
> > > > > > > > > >> > > > * MINOR: "if the epoch is greater than the minimum
> > > > > expected
> > > > > > > > epoch,
> > > > > > > > > >> that
> > > > > > > > > >> > > the
> > > > > > > > > >> > > >> new epoch does not begin at an earlier offset
> than
> > > the
> > > > > > fetch
> > > > > > > > > >> offset.
> > > > > > > > > >> > In
> > > > > > > > > >> > > >> the latter case, the leader can respond with a
> new
> > > > > > > > LOG_TRUNCATION
> > > > > > > > > >> > error
> > > > > > > > > >> > > >> code" should it be "does not begin at a later
> > offset
> > > > than
> > > > > > the
> > > > > > > > > fetch
> > > > > > > > > >> > > >> offset"?
> > > > > > > > > >> > > >
> > > > > > > > > >> > > >
> > > > > > > > > >> > > > I think the comment is correct, though the
> phrasing
> > > may
> > > > be
> > > > > > > > > >> confusing.
> > > > > > > > > >> > We
> > > > > > > > > >> > > > know truncation has occurred if there exists a
> > larger
> > > > > epoch
> > > > > > > > with a
> > > > > > > > > >> > > starting
> > > > > > > > > >> > > > offset that is lower than the fetch offset. Let me
> > try
> > > > to
> > > > > > > > rephrase
> > > > > > > > > >> > this.
> > > > > > > > > >> > > >
> > > > > > > > > >> > > > Thanks,
> > > > > > > > > >> > > > Jason
> > > > > > > > > >> > > >
> > > > > > > > > >> > > > On Wed, Jun 27, 2018 at 9:17 AM, Guozhang Wang <
> > > > > > > > > wangg...@gmail.com>
> > > > > > > > > >> > > wrote:
> > > > > > > > > >> > > >
> > > > > > > > > >> > > >> Jason, thanks for the KIP. A few comments:
> > > > > > > > > >> > > >>
> > > > > > > > > >> > > >> * I think Dong's question about whether to use
> > > > > > > timestamp-based
> > > > > > > > > >> > approach
> > > > > > > > > >> > > >> v.s. start-offset-of-first-larger-epoch is valid;
> > > more
> > > > > > > > > >> specifically,
> > > > > > > > > >> > > with
> > > > > > > > > >> > > >> timestamp-based approach we may still be reseting
> > to
> > > an
> > > > > > > offset
> > > > > > > > > >> falling
> > > > > > > > > >> > > >> into
> > > > > > > > > >> > > >> the truncated interval, and hence we may still
> miss
> > > > some
> > > > > > > data,
> > > > > > > > > i.e.
> > > > > > > > > >> > not
> > > > > > > > > >> > > >> guaranteeing at-least-once still. With the
> > > > > > > > > >> > > >> start-offset-of-first-larger-epoch, I'm not sure
> > if
> > > it
> > > > > > will
> > > > > > > > > >> guarantee
> > > > > > > > > >> > > no
> > > > > > > > > >> > > >> valid data is missed when we have consecutive log
> > > > > > truncations
> > > > > > > > > >> (maybe
> > > > > > > > > >> > we
> > > > > > > > > >> > > >> need to look back into details of KIP-101 to
> figure
> > > it
> > > > > > out).
> > > > > > > If
> > > > > > > > > the
> > > > > > > > > >> > > latter
> > > > > > > > > >> > > >> can indeed guarantee at least once, we could
> > consider
> > > > > using
> > > > > > > > that
> > > > > > > > > >> > > approach.
> > > > > > > > > >> > > >>
> > > > > > > > > >> > > >> * My current understanding is that, with unclean
> > > leader
> > > > > > > > election
> > > > > > > > > >> > turned
> > > > > > > > > >> > > >> on,
> > > > > > > > > >> > > >> exactly-once is out of the window since we cannot
> > > > > guarantee
> > > > > > > > that
> > > > > > > > > >> all
> > > > > > > > > >> > > >> committed message markers will not be lost. And
> > hence
> > > > > there
> > > > > > > is
> > > > > > > > no
> > > > > > > > > >> need
> > > > > > > > > >> > > to
> > > > > > > > > >> > > >> have special handling logic for LOG_TRUNCATED or
> > OOR
> > > > > error
> > > > > > > > codes
> > > > > > > > > >> with
> > > > > > > > > >> > > >> read.committed turned on. Is that right?
> > > > > > > > > >> > > >>
> > > > > > > > > >> > > >> * MINOR: "if the epoch is greater than the
> minimum
> > > > > expected
> > > > > > > > > epoch,
> > > > > > > > > >> > that
> > > > > > > > > >> > > >> the
> > > > > > > > > >> > > >> new epoch does not begin at an earlier offset
> than
> > > the
> > > > > > fetch
> > > > > > > > > >> offset.
> > > > > > > > > >> > In
> > > > > > > > > >> > > >> the latter case, the leader can respond with a
> new
> > > > > > > > LOG_TRUNCATION
> > > > > > > > > >> > error
> > > > > > > > > >> > > >> code" should it be "does not begin at a later
> > offset
> > > > than
> > > > > > the
> > > > > > > > > fetch
> > > > > > > > > >> > > >> offset"?
> > > > > > > > > >> > > >>
> > > > > > > > > >> > > >>
> > > > > > > > > >> > > >>
> > > > > > > > > >> > > >> Guozhang
> > > > > > > > > >> > > >>
> > > > > > > > > >> > > >>
> > > > > > > > > >> > > >> On Tue, Jun 26, 2018 at 6:51 PM, Dong Lin <
> > > > > > > lindon...@gmail.com
> > > > > > > > >
> > > > > > > > > >> > wrote:
> > > > > > > > > >> > > >>
> > > > > > > > > >> > > >> > Hey Jason,
> > > > > > > > > >> > > >> >
> > > > > > > > > >> > > >> > Thanks for the explanation.
> > > > > > > > > >> > > >> >
> > > > > > > > > >> > > >> > Please correct me if this is wrong. The
> "unknown
> > > > > > truncation
> > > > > > > > > >> offset"
> > > > > > > > > >> > > >> > scenario happens when consumer does not have
> the
> > > full
> > > > > > > > > >> leaderEpoch ->
> > > > > > > > > >> > > >> offset
> > > > > > > > > >> > > >> > mapping. In this case we can still use the
> > > > > KIP-101-based
> > > > > > > > > >> approach to
> > > > > > > > > >> > > >> > truncate offset to "start offset of the first
> > > Leader
> > > > > > Epoch
> > > > > > > > > larger
> > > > > > > > > >> > than
> > > > > > > > > >> > > >> last
> > > > > > > > > >> > > >> > epoch of the consumer" but it may be
> inaccurate.
> > So
> > > > the
> > > > > > KIP
> > > > > > > > > >> chooses
> > > > > > > > > >> > to
> > > > > > > > > >> > > >> use
> > > > > > > > > >> > > >> > the timestamp-based approach which is also
> > > > best-effort.
> > > > > > > > > >> > > >> >
> > > > > > > > > >> > > >> > If this understanding is correct, for "closest"
> > > > offset
> > > > > > > reset
> > > > > > > > > >> policy
> > > > > > > > > >> > > and
> > > > > > > > > >> > > >> > "unknown truncation offset" scenario, I am
> > > wondering
> > > > > > > whether
> > > > > > > > it
> > > > > > > > > >> > maybe
> > > > > > > > > >> > > >> > better to replace timestamp-based approach with
> > > > KIP-101
> > > > > > > based
> > > > > > > > > >> > > approach.
> > > > > > > > > >> > > >> In
> > > > > > > > > >> > > >> > comparison to timestamp-based approach, the
> > > > > KIP-101-based
> > > > > > > > > >> approach
> > > > > > > > > >> > > >> seems to
> > > > > > > > > >> > > >> > simplify the API a bit since user does not need
> > to
> > > > > > > understand
> > > > > > > > > >> > > timestamp.
> > > > > > > > > >> > > >> > Similar to the timestamp-based approach, both
> > > > > approaches
> > > > > > > are
> > > > > > > > > >> > > best-effort
> > > > > > > > > >> > > >> > and do not guarantee that consumer can consume
> > all
> > > > > > > messages.
> > > > > > > > It
> > > > > > > > > >> is
> > > > > > > > > >> > not
> > > > > > > > > >> > > >> like
> > > > > > > > > >> > > >> > KIP-279 which guarantees that follower broker
> can
> > > > > consume
> > > > > > > all
> > > > > > > > > >> > messages
> > > > > > > > > >> > > >> from
> > > > > > > > > >> > > >> > the leader.
> > > > > > > > > >> > > >> >
> > > > > > > > > >> > > >> > Then it seems that the remaining difference is
> > > mostly
> > > > > > about
> > > > > > > > > >> > accuracy,
> > > > > > > > > >> > > >> i.e.
> > > > > > > > > >> > > >> > how much message will be duplicated or missed
> in
> > > the
> > > > > > > "unknown
> > > > > > > > > >> > > truncation
> > > > > > > > > >> > > >> > offset" scenario. Not sure either one is
> clearly
> > > > better
> > > > > > > than
> > > > > > > > > the
> > > > > > > > > >> > > other.
> > > > > > > > > >> > > >> > Note that there are two scenarios mentioned in
> > > > KIP-279
> > > > > > > which
> > > > > > > > > are
> > > > > > > > > >> not
> > > > > > > > > >> > > >> > addressed by KIP-101. Both scenarios require
> > quick
> > > > > > > leadership
> > > > > > > > > >> change
> > > > > > > > > >> > > >> > between brokers, which seems to suggest that
> the
> > > > offset
> > > > > > > based
> > > > > > > > > >> > obtained
> > > > > > > > > >> > > >> > by "start
> > > > > > > > > >> > > >> > offset of the first Leader Epoch larger than
> last
> > > > epoch
> > > > > > of
> > > > > > > > the
> > > > > > > > > >> > > consumer"
> > > > > > > > > >> > > >> > under these two scenarios may be very close to
> > the
> > > > > offset
> > > > > > > > > >> obtained
> > > > > > > > > >> > by
> > > > > > > > > >> > > >> the
> > > > > > > > > >> > > >> > message timestamp. Does this sound reasonable?
> > > > > > > > > >> > > >> >
> > > > > > > > > >> > > >> > Good point that users on v1 format can get
> > benefit
> > > > with
> > > > > > > > > timestamp
> > > > > > > > > >> > > based
> > > > > > > > > >> > > >> > approach. On the other hand it seems like a
> short
> > > > term
> > > > > > > > benefit
> > > > > > > > > >> for
> > > > > > > > > >> > > users
> > > > > > > > > >> > > >> > who have not migrated. I am just not sure
> whether
> > > it
> > > > is
> > > > > > > more
> > > > > > > > > >> > important
> > > > > > > > > >> > > >> than
> > > > > > > > > >> > > >> > designing a better API.
> > > > > > > > > >> > > >> >
> > > > > > > > > >> > > >> > Also, for both "latest" and "earliest" reset
> > > policy,
> > > > do
> > > > > > you
> > > > > > > > > >> think it
> > > > > > > > > >> > > >> would
> > > > > > > > > >> > > >> > make sense to also use the KIP-101 based
> approach
> > > to
> > > > > > > truncate
> > > > > > > > > >> offset
> > > > > > > > > >> > > for
> > > > > > > > > >> > > >> > the "unknown truncation offset" scenario?
> > > > > > > > > >> > > >> >
> > > > > > > > > >> > > >> >
> > > > > > > > > >> > > >> > Thanks,
> > > > > > > > > >> > > >> > Dong
> > > > > > > > > >> > > >> >
> > > > > > > > > >> > > >>
> > > > > > > > > >> > > >>
> > > > > > > > > >> > > >>
> > > > > > > > > >> > > >> --
> > > > > > > > > >> > > >> -- Guozhang
> > > > > > > > > >> > > >>
> > > > > > > > > >> > > >
> > > > > > > > > >> > > >
> > > > > > > > > >> > >
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > --
> > > > > > > > > >> > -- Guozhang
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to