Hey Anna,

Thanks for the comment. To answer your question, it seems that we can cover
all case in this KIP. As stated in "Consumer Handling" section, KIP-101
based approach will be used to derive the truncation offset from the
2-tuple (offset, leaderEpoch). This approach is best effort and it is
inaccurate only in very rare scenarios (as described in KIP-279).

By using seek(offset, leaderEpoch), consumer will still be able to follow
this best-effort approach to detect log truncation and determine the
truncation offset. On the other hand, if we use seek(offset), consumer will
not detect log truncation in some cases which weakens the guarantee of this
KIP. Does this make sense?

Thanks,
Dong

On Tue, Jul 10, 2018 at 3:14 PM, Anna Povzner <a...@confluent.io> wrote:

> Sorry, I hit "send" before finishing. Continuing...
>
>
> 2) Hiding most of the consumer handling log truncation logic with minimal
> exposure in KafkaConsumer API.  I was proposing this path.
>
>
> Before answering your specific questions… I want to answer to your comment
> “In general, maybe we should discuss the final solution that covers all
> cases?”. With current KIP, we don’t cover all cases of consumer detecting
> log truncation because the KIP proposes a leader epoch cache in consumer
> that does not persist across restarts. Plus, we only store last committed
> offset (either internally or users can store externally). This has a
> limitation that the consumer will not always be able to find point of
> truncation just because we have a limited history (just one data point).
>
>
> So, maybe we should first agree on whether we accept that storing last
> committed offset/leader epoch has a limitation that the consumer will not
> be able to detect log truncation in all cases?
>
>
> Thanks,
>
> Anna
>
> On Tue, Jul 10, 2018 at 2:20 PM Anna Povzner <a...@confluent.io> wrote:
>
> > Hi Dong,
> >
> > Thanks for the follow up! I finally have much more clear understanding of
> > where you are coming from.
> >
> > You are right. The success of findOffsets()/finding a point of
> > non-divergence depends on whether we have enough entries in the
> consumer's
> > leader epoch cache. However, I think this is a fundamental limitation of
> > having a leader epoch cache that does not persist across consumer
> restarts.
> >
> > If we consider the general case where consumer may or may not have this
> > cache, then I see two paths:
> > 1) Letting the user to track the leader epoch history externally, and
> have
> > more exposure to leader epoch and finding point of non-divergence in
> > KafkaConsumer API. I understand this is the case you were talking about.
> >
> >
> >
> > On Tue, Jul 10, 2018 at 12:16 PM Dong Lin <lindon...@gmail.com> wrote:
> >
> >> Hey Anna,
> >>
> >> Thanks much for your detailed explanation and example! It does help me
> >> understand the difference between our understanding.
> >>
> >> So it seems that the solution based on findOffsets() currently focuses
> >> mainly on the scenario that consumer has cached leaderEpoch -> offset
> >> mapping whereas I was thinking about the general case where consumer may
> >> or
> >> may not have this cache. I guess that is why we have different
> >> understanding here. I have some comments below.
> >>
> >>
> >> 3) The proposed solution using findOffsets(offset, leaderEpoch) followed
> >> by
> >> seek(offset) works if consumer has the cached leaderEpoch -> offset
> >> mapping. But if we assume consumer has this cache, do we need to have
> >> leaderEpoch in the findOffsets(...)? Intuitively, the
> findOffsets(offset)
> >> can also derive the leaderEpoch using offset just like the proposed
> >> solution does with seek(offset).
> >>
> >>
> >> 4) If consumer does not have cached leaderEpoch -> offset mapping, which
> >> is
> >> the case if consumer is restarted on a new machine, then it is not clear
> >> what leaderEpoch would be included in the FetchRequest if consumer does
> >> seek(offset). This is the case that motivates the first question of the
> >> previous email. In general, maybe we should discuss the final solution
> >> that
> >> covers all cases?
> >>
> >>
> >> 5) The second question in my previous email is related to the following
> >> paragraph:
> >>
> >> "... In some cases, offsets returned from position() could be actual
> >> consumed messages by this consumer identified by {offset, leader epoch}.
> >> In
> >> other cases, position() returns offset that was not actually consumed.
> >> Suppose, the user calls position() for the last offset...".
> >>
> >> I guess my point is that, if user calls position() for the last offset
> and
> >> uses that offset in seek(...), then user can probably just call
> >> Consumer#seekToEnd() without calling position() and seek(...). Similarly
> >> user can call Consumer#seekToBeginning() to the seek to the earliest
> >> position without calling position() and seek(...). Thus position() only
> >> needs to return the actual consumed messages identified by {offset,
> leader
> >> epoch}. Does this make sense?
> >>
> >>
> >> Thanks,
> >> Dong
> >>
> >>
> >> On Mon, Jul 9, 2018 at 6:47 PM, Anna Povzner <a...@confluent.io> wrote:
> >>
> >> > Hi Dong,
> >> >
> >> >
> >> > Thanks for considering my suggestions.
> >> >
> >> >
> >> > Based on your comments, I realized that my suggestion was not complete
> >> with
> >> > regard to KafkaConsumer API vs. consumer-broker protocol. While I
> >> propose
> >> > to keep KafkaConsumer#seek() unchanged and take offset only, the
> >> underlying
> >> > consumer will send the next FetchRequest() to broker with offset and
> >> > leaderEpoch if it is known (based on leader epoch cache in consumer) —
> >> note
> >> > that this is different from the current KIP, which suggests to always
> >> send
> >> > unknown leader epoch after seek(). This way, if the consumer and a
> >> broker
> >> > agreed on the point of non-divergence, which is some {offset,
> >> leaderEpoch}
> >> > pair, the new leader which causes another truncation (even further
> back)
> >> > will be able to detect new divergence and restart the process of
> finding
> >> > the new point of non-divergence. So, to answer your question, If the
> >> > truncation happens just after the user calls
> >> > KafkaConsumer#findOffsets(offset, leaderEpoch) followed by
> seek(offset),
> >> > the user will not seek to the wrong position without knowing that
> >> > truncation has happened, because the consumer will get another
> >> truncation
> >> > error, and seek again.
> >> >
> >> >
> >> > I am afraid, I did not understand your second question. Let me
> >> summarize my
> >> > suggestions again, and then give an example to hopefully make my
> >> > suggestions more clear. Also, the last part of my example shows how
> the
> >> > use-case in your first question will work. If it does not answer your
> >> > second question, would you mind clarifying? I am also focusing on the
> >> case
> >> > of a consumer having enough entries in the cache. The case of
> restarting
> >> > from committed offset either stored externally or internally will
> >> probably
> >> > need to be discussed more.
> >> >
> >> >
> >> > Let me summarize my suggestion again:
> >> >
> >> > 1) KafkaConsumer#seek() and KafkaConsumer#position() remains unchanged
> >> >
> >> > 2) New KafkaConsumer#findOffsets() takes {offset, leaderEpoch} pair
> per
> >> > topic partition and returns offset per topic partition.
> >> >
> >> > 3) FetchRequest() to broker after KafkaConsumer#seek() will contain
> the
> >> > offset set by seek and leaderEpoch that corresponds to the offset
> based
> >> on
> >> > leader epoch cache in the consumer.
> >> >
> >> >
> >> > The rest of this e-mail is a long and contrived example with several
> log
> >> > truncations and unclean leader elections to illustrate the API and
> your
> >> > first use-case. Suppose we have three brokers. Initially, Broker A, B,
> >> and
> >> > C has one message at offset 0 with leader epoch 0. Then, Broker A goes
> >> down
> >> > for some time. Broker B becomes a leader with epoch 1, and writes
> >> messages
> >> > to offsets 1 and 2. Broker C fetches offset 1, but before fetching
> >> offset
> >> > 2, becomes a leader with leader epoch 2 and writes a message at offset
> >> 2.
> >> > Here is the state of brokers at this point:
> >> >
> >> > > Broker A:
> >> > > offset 0, epoch 0 <— leader
> >> > > goes down…
> >> >
> >> >
> >> > > Broker B:
> >> > > offset 0, epoch 0
> >> > > offset 1, epoch 1  <- leader
> >> > > offset 2, epoch 1
> >> >
> >> >
> >> >
> >> > Broker C:
> >> > > offset 0, epoch 0
> >> > > offset 1, epoch 1
> >> > > offset 2, epoch 2 <— leader
> >> >
> >> >
> >> > Before Broker C becomes a leader with leader epoch 2, the consumer
> >> consumed
> >> > the following messages from broker A and broker B:
> >> >
> >> > {offset=0, leaderEpoch=0}, {offset=1, leaderEpoch=1}, {offset=2,
> >> > leaderEpoch=1}.
> >> >
> >> > Consumer’s leader epoch cache at this point contains the following
> >> entries:
> >> >
> >> > (leaderEpoch=0, startOffset=0)
> >> >
> >> > (leaderEpoch=1, startOffset=1)
> >> >
> >> > endOffset = 3
> >> >
> >> >
> >> > Then, broker B becomes the follower of broker C, truncates and starts
> >> > fetching from offset 2.
> >> >
> >> > Consumer sends fetchRequest(offset=3, leaderEpoch=1) and gets
> >> > LOG_TRUNCATION
> >> > error from broker C.
> >> >
> >> > In response, the client calls KafkaConsumer#findOffsets(offset=3,
> >> > leaderEpoch=1). The underlying consumer sends
> >> > OffsetsForLeaderEpoch(leaderEpoch=1), broker C responds with
> >> > {leaderEpoch=1, endOffset=2}. So, KafkaConsumer#findOffsets(offset=3,
> >> > leaderEpoch=1) returns offset=2.
> >> >
> >> > In response, consumer calls KafkaConsumer@seek(offset=2) followed by
> >> > poll(), which results in FetchRequest(offset=2, leaderEpoch=1) to
> >> broker C.
> >> >
> >> >
> >> > I will continue with this example with the goal to answer your first
> >> > question about truncation just after findOffsets() followed by seek():
> >> >
> >> > Suppose, brokers B and C go down, and broker A comes up and becomes a
> >> > leader with leader epoch 3, and writes a message to offset 1. Suppose,
> >> this
> >> > happens before the consumer gets response from broker C to the
> previous
> >> > fetch request:  FetchRequest(offset=2, leaderEpoch=1).
> >> >
> >> > Consumer re-sends FetchRequest(offset=2, leaderEpoch=1) to broker A,
> >> which
> >> > returns LOG_TRUNCATION error, because broker A has leader epoch 3 >
> >> leader
> >> > epoch in FetchRequest with starting offset = 1 < offset 2 in
> >> > FetchRequest().
> >> >
> >> > In response, the user calls KafkaConsumer#findOffsets(offset=2,
> >> > leaderEpoch=1). The underlying consumer sends
> >> > OffsetsForLeaderEpoch(leaderEpoch=1), broker A responds with
> >> > {leaderEpoch=0, endOffset=1}; the underlying consumer finds
> leaderEpoch
> >> = 0
> >> > in its cache with end offset == 1, which results in
> >> > KafkaConsumer#findOffsets(offset=2, leaderEpoch=1) returning offset
> = 1.
> >> >
> >> > In response, the user calls KafkaConsumer@seek(offset=1) followed by
> >> > poll(), which results in FetchRequest(offset=1, leaderEpoch=0) to
> >> broker A,
> >> > which responds with message at offset 1, leader epoch 3.
> >> >
> >> >
> >> > I will think some more about consumers restarting from committed
> >> offsets,
> >> > and send a follow up.
> >> >
> >> >
> >> > Thanks,
> >> >
> >> > Anna
> >> >
> >> >
> >> > On Sat, Jul 7, 2018 at 1:36 AM Dong Lin <lindon...@gmail.com> wrote:
> >> >
> >> > > Hey Anna,
> >> > >
> >> > > Thanks much for the thoughtful reply. It makes sense to different
> >> between
> >> > > "seeking to a message" and "seeking to a position". I have to
> >> questions
> >> > > here:
> >> > >
> >> > > - For "seeking to a message" use-case, with the proposed approach
> user
> >> > > needs to call findOffset(offset, leaderEpoch) followed by
> >> seek(offset).
> >> > If
> >> > > message truncation and message append happen immediately after
> >> > > findOffset(offset,
> >> > > leaderEpoch) but before seek(offset), it seems that user will seek
> to
> >> the
> >> > > wrong message without knowing the truncation has happened. Would
> this
> >> be
> >> > a
> >> > > problem?
> >> > >
> >> > > - For "seeking to a position" use-case, it seems that there can be
> two
> >> > > positions, i.e. earliest and latest. So these two cases can be
> >> > > Consumer.fulfilled by seekToBeginning() and Consumer.seekToEnd().
> >> Then it
> >> > > seems that user will only need to call position() and seek() for
> >> "seeking
> >> > > to a message" use-case?
> >> > >
> >> > > Thanks,
> >> > > Dong
> >> > >
> >> > >
> >> > > On Wed, Jul 4, 2018 at 12:33 PM, Anna Povzner <a...@confluent.io>
> >> wrote:
> >> > >
> >> > > > Hi Jason and Dong,
> >> > > >
> >> > > >
> >> > > > I’ve been thinking about your suggestions and discussion regarding
> >> > > > position(), seek(), and new proposed API.
> >> > > >
> >> > > >
> >> > > > Here is my thought process why we should keep position() and
> seek()
> >> API
> >> > > > unchanged.
> >> > > >
> >> > > >
> >> > > > I think we should separate {offset, leader epoch} that uniquely
> >> > > identifies
> >> > > > a message from an offset that is a position. In some cases,
> offsets
> >> > > > returned from position() could be actual consumed messages by this
> >> > > consumer
> >> > > > identified by {offset, leader epoch}. In other cases, position()
> >> > returns
> >> > > > offset that was not actually consumed. Suppose, the user calls
> >> > position()
> >> > > > for the last offset. Suppose we return {offset, leader epoch} of
> the
> >> > > > message currently in the log. Then, the message gets truncated
> >> before
> >> > > > consumer’s first poll(). It does not make sense for poll() to fail
> >> in
> >> > > this
> >> > > > case, because the log truncation did not actually happen from the
> >> > > consumer
> >> > > > perspective. On the other hand, as the KIP proposes, it makes
> sense
> >> for
> >> > > the
> >> > > > committed() method to return {offset, leader epoch} because those
> >> > offsets
> >> > > > represent actual consumed messages.
> >> > > >
> >> > > >
> >> > > > The same argument applies to the seek() method — we are not
> seeking
> >> to
> >> > a
> >> > > > message, we are seeking to a position.
> >> > > >
> >> > > >
> >> > > > I like the proposal to add KafkaConsumer#findOffsets() API. I am
> >> > assuming
> >> > > > something like:
> >> > > >
> >> > > > Map<TopicPartition, Long> findOffsets(Map<TopicPartition,
> >> > OffsetAndEpoch>
> >> > > > offsetsToSearch)
> >> > > >
> >> > > > Similar to seek() and position(), I think findOffsets() should
> >> return
> >> > > > offset without leader epoch, because what we want is the offset
> >> that we
> >> > > > think is closest to the not divergent message from the given
> >> consumed
> >> > > > message. Until the consumer actually fetches the message, we
> should
> >> not
> >> > > let
> >> > > > the consumer store the leader epoch for a message it did not
> >> consume.
> >> > > >
> >> > > >
> >> > > > So, the workflow will be:
> >> > > >
> >> > > > 1) The user gets LogTruncationException with {offset, leader epoch
> >> of
> >> > the
> >> > > > previous message} (whatever we send with new FetchRecords
> request).
> >> > > >
> >> > > > 2) offset = findOffsets(tp -> {offset, leader epoch})
> >> > > >
> >> > > > 3) seek(offset)
> >> > > >
> >> > > >
> >> > > > For the use-case where the users store committed offsets
> externally:
> >> > > >
> >> > > > 1) Such users would have to track the leader epoch together with
> an
> >> > > offset.
> >> > > > Otherwise, there is no way to detect later what leader epoch was
> >> > > associated
> >> > > > with the message. I think it’s reasonable to ask that from users
> if
> >> > they
> >> > > > want to detect log truncation. Otherwise, they will get the
> current
> >> > > > behavior.
> >> > > >
> >> > > >
> >> > > > If the users currently get an offset to be stored using
> position(),
> >> I
> >> > see
> >> > > > two possibilities. First, they call save offset returned from
> >> > position()
> >> > > > that they call before poll(). In that case, it would not be
> correct
> >> to
> >> > > > store {offset, leader epoch} if we would have changed position()
> to
> >> > > return
> >> > > > {offset, leader epoch} since actual fetched message could be
> >> different
> >> > > > (from the example I described earlier). So, it would be more
> >> correct to
> >> > > > call position() after poll(). However, the user already gets
> >> > > > ConsumerRecords at this point, from which the user can extract
> >> {offset,
> >> > > > leader epoch} of the last message.
> >> > > >
> >> > > >
> >> > > > So, I like the idea of adding a helper method to ConsumerRecords,
> as
> >> > > Jason
> >> > > > proposed, something like:
> >> > > >
> >> > > > public OffsetAndEpoch lastOffsetWithLeaderEpoch(), where
> >> OffsetAndEpoch
> >> > > is
> >> > > > a data struct holding {offset, leader epoch}.
> >> > > >
> >> > > >
> >> > > > In this case, we would advise the user to follow the workflow:
> >> poll(),
> >> > > get
> >> > > > {offset, leader epoch} from ConsumerRecords#lastOffsetWith
> >> > LeaderEpoch(),
> >> > > > save offset and leader epoch, process records.
> >> > > >
> >> > > >
> >> > > > 2) When the user needs to seek to the last committed offset, they
> >> call
> >> > > new
> >> > > > findOffsets(saved offset, leader epoch), and then seek(offset).
> >> > > >
> >> > > >
> >> > > > What do you think?
> >> > > >
> >> > > >
> >> > > > Thanks,
> >> > > >
> >> > > > Anna
> >> > > >
> >> > > >
> >> > > > On Tue, Jul 3, 2018 at 4:06 PM Dong Lin <lindon...@gmail.com>
> >> wrote:
> >> > > >
> >> > > > > Hey Jason,
> >> > > > >
> >> > > > > Thanks much for your thoughtful explanation.
> >> > > > >
> >> > > > > Yes the solution using findOffsets(offset, leaderEpoch) also
> >> works.
> >> > The
> >> > > > > advantage of this solution it adds only one API instead of two
> >> APIs.
> >> > > The
> >> > > > > concern is that its usage seems a bit more clumsy for advanced
> >> users.
> >> > > > More
> >> > > > > specifically, advanced users who store offsets externally will
> >> always
> >> > > > need
> >> > > > > to call findOffsets() before calling seek(offset) during
> consumer
> >> > > > > initialization. And those advanced users will need to manually
> >> keep
> >> > > track
> >> > > > > of the leaderEpoch of the last ConsumerRecord.
> >> > > > >
> >> > > > > The other solution may be more user-friendly for advanced users
> >> is to
> >> > > add
> >> > > > > two APIs, `void seek(offset, leaderEpoch)` and `(offset, epoch)
> =
> >> > > > > offsetEpochs(topicPartition)`.
> >> > > > >
> >> > > > > I kind of prefer the second solution because it is easier to use
> >> for
> >> > > > > advanced users. If we need to expose leaderEpoch anyway to
> safely
> >> > > > identify
> >> > > > > a message, it may be conceptually simpler to expose it directly
> in
> >> > > > > seek(...) rather than requiring one more translation using
> >> > > > > findOffsets(...). But I am also OK with the first solution if
> >> other
> >> > > > > developers also favor that one :)
> >> > > > >
> >> > > > > Thanks,
> >> > > > > Dong
> >> > > > >
> >> > > > >
> >> > > > > On Mon, Jul 2, 2018 at 11:10 AM, Jason Gustafson <
> >> ja...@confluent.io
> >> > >
> >> > > > > wrote:
> >> > > > >
> >> > > > > > Hi Dong,
> >> > > > > >
> >> > > > > > Thanks, I've been thinking about your suggestions a bit. It is
> >> > > > > challenging
> >> > > > > > to make this work given the current APIs. One of the
> >> difficulties
> >> > is
> >> > > > that
> >> > > > > > we don't have an API to find the leader epoch for a given
> >> offset at
> >> > > the
> >> > > > > > moment. So if the user does a seek to offset 5, then we'll
> need
> >> a
> >> > new
> >> > > > API
> >> > > > > > to find the corresponding epoch in order to fulfill the new
> >> > > position()
> >> > > > > API.
> >> > > > > > Potentially we could modify ListOffsets to enable finding the
> >> > leader
> >> > > > > epoch,
> >> > > > > > but I am not sure it is worthwhile. Perhaps it is reasonable
> for
> >> > > > advanced
> >> > > > > > usage to expect that the epoch information, if needed, will be
> >> > > > extracted
> >> > > > > > from the records directly? It might make sense to expose a
> >> helper
> >> > in
> >> > > > > > `ConsumerRecords` to make this a little easier though.
> >> > > > > >
> >> > > > > > Alternatively, if we think it is important to have this
> >> information
> >> > > > > exposed
> >> > > > > > directly, we could create batch APIs to solve the naming
> >> problem.
> >> > For
> >> > > > > > example:
> >> > > > > >
> >> > > > > > Map<TopicPartition, OffsetAndEpoch> positions();
> >> > > > > > void seek(Map<TopicPartition, OffsetAndEpoch> positions);
> >> > > > > >
> >> > > > > > However, I'm actually leaning toward leaving the seek() and
> >> > > position()
> >> > > > > APIs
> >> > > > > > unchanged. Instead, we can add a new API to search for offset
> by
> >> > > > > timestamp
> >> > > > > > or by offset/leader epoch. Let's say we call it `findOffsets`.
> >> If
> >> > the
> >> > > > > user
> >> > > > > > hits a log truncation error, they can use this API to find the
> >> > > closest
> >> > > > > > offset and then do a seek(). At the same time, we deprecate
> the
> >> > > > > > `offsetsForTimes` APIs. We now have two use cases which
> require
> >> > > finding
> >> > > > > > offsets, so I think we should make this API general and leave
> >> the
> >> > > door
> >> > > > > open
> >> > > > > > for future extensions.
> >> > > > > >
> >> > > > > > By the way, I'm unclear about the desire to move part of this
> >> > > > > functionality
> >> > > > > > to AdminClient. Guozhang suggested this previously, but I
> think
> >> it
> >> > > only
> >> > > > > > makes sense for cross-cutting capabilities such as topic
> >> creation.
> >> > If
> >> > > > we
> >> > > > > > have an API which is primarily useful by consumers, then I
> think
> >> > > that's
> >> > > > > > where it should be exposed. The AdminClient also has its own
> API
> >> > > > > integrity
> >> > > > > > and should not become a dumping ground for advanced use cases.
> >> I'll
> >> > > > > update
> >> > > > > > the KIP with the  `findOffsets` API suggested above and we can
> >> see
> >> > if
> >> > > > it
> >> > > > > > does a good enough job of keeping the API simple for common
> >> cases.
> >> > > > > >
> >> > > > > > Thanks,
> >> > > > > > Jason
> >> > > > > >
> >> > > > > >
> >> > > > > > On Sat, Jun 30, 2018 at 4:39 AM, Dong Lin <
> lindon...@gmail.com>
> >> > > wrote:
> >> > > > > >
> >> > > > > > > Hey Jason,
> >> > > > > > >
> >> > > > > > > Regarding seek(...), it seems that we want an API for user
> to
> >> > > > > initialize
> >> > > > > > > consumer with (offset, leaderEpoch) and that API should
> allow
> >> > > > throwing
> >> > > > > > > PartitionTruncationException. Suppose we agree on this, then
> >> > > > > > > seekToNearest() is not sufficient because it will always
> >> swallow
> >> > > > > > > PartitionTruncationException. Here we have two options. The
> >> first
> >> > > > > option
> >> > > > > > is
> >> > > > > > > to add API offsetsForLeaderEpochs() to translate
> (leaderEpoch,
> >> > > > offset)
> >> > > > > to
> >> > > > > > > offset. The second option is to have add seek(offset,
> >> > leaderEpoch).
> >> > > > It
> >> > > > > > > seems that second option may be more simpler because it
> makes
> >> it
> >> > > > clear
> >> > > > > > that
> >> > > > > > > (offset, leaderEpoch) will be used to identify consumer's
> >> > position
> >> > > > in a
> >> > > > > > > partition. And user only needs to handle
> >> > > PartitionTruncationException
> >> > > > > > from
> >> > > > > > > the poll(). In comparison the first option seems a bit
> harder
> >> to
> >> > > use
> >> > > > > > > because user have to also handle the
> >> PartitionTruncationException
> >> > > if
> >> > > > > > > offsetsForLeaderEpochs() returns different offset from
> >> > > user-provided
> >> > > > > > > offset. What do you think?
> >> > > > > > >
> >> > > > > > > If we decide to add API seek(offset, leaderEpoch), then we
> can
> >> > > decide
> >> > > > > > > whether and how to add API to translate (offset,
> leaderEpoch)
> >> to
> >> > > > > offset.
> >> > > > > > It
> >> > > > > > > seems that this API will be needed by advanced user to don't
> >> want
> >> > > > auto
> >> > > > > > > offset reset (so that it can be notified) but still wants to
> >> > reset
> >> > > > > offset
> >> > > > > > > to closest. For those users if probably makes sense to only
> >> have
> >> > > the
> >> > > > > API
> >> > > > > > in
> >> > > > > > > AdminClient. offsetsForTimes() seems like a common API that
> >> will
> >> > be
> >> > > > > > needed
> >> > > > > > > by user's of consumer in general, so it may be more
> >> reasonable to
> >> > > > stay
> >> > > > > in
> >> > > > > > > the consumer API. I don't have a strong opinion on whether
> >> > > > > > > offsetsForTimes() should be replaced by API in AdminClient.
> >> > > > > > >
> >> > > > > > > Though (offset, leaderEpoch) is needed to uniquely identify
> a
> >> > > message
> >> > > > > in
> >> > > > > > > general, it is only needed for advanced users who has turned
> >> on
> >> > > > unclean
> >> > > > > > > leader election, need to use seek(..), and don't want auto
> >> offset
> >> > > > > reset.
> >> > > > > > > Most other users probably just want to enable auto offset
> >> reset
> >> > and
> >> > > > > store
> >> > > > > > > offset in Kafka. Thus we might want to keep the existing
> >> > > offset-only
> >> > > > > APIs
> >> > > > > > > (e.g. seek() and position()) for most users while adding new
> >> APIs
> >> > > for
> >> > > > > > > advanced users. And yes, it seems that we need new name for
> >> > > > position().
> >> > > > > > >
> >> > > > > > > Though I think we need new APIs to carry the new information
> >> > (e.g.
> >> > > > > > > leaderEpoch), I am not very sure how that should look like.
> >> One
> >> > > > > possible
> >> > > > > > > option is those APIs in KIP-232. Another option is something
> >> like
> >> > > > this:
> >> > > > > > >
> >> > > > > > > `````
> >> > > > > > > class OffsetEpochs {
> >> > > > > > >   long offset;
> >> > > > > > >   int leaderEpoch;
> >> > > > > > >   int partitionEpoch;   // This may be needed later as
> >> discussed
> >> > in
> >> > > > > > KIP-232
> >> > > > > > >   ... // Hopefully these are all we need to identify message
> >> in
> >> > > > Kafka.
> >> > > > > > But
> >> > > > > > > if we need more then we can add new fields in this class.
> >> > > > > > > }
> >> > > > > > >
> >> > > > > > > OffsetEpochs offsetEpochs(TopicPartition);
> >> > > > > > >
> >> > > > > > > void seek(TopicPartition, OffsetEpochs);
> >> > > > > > > ``````
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > Thanks,
> >> > > > > > > Dong
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > On Fri, Jun 29, 2018 at 11:13 AM, Jason Gustafson <
> >> > > > ja...@confluent.io>
> >> > > > > > > wrote:
> >> > > > > > >
> >> > > > > > > > Hey Dong,
> >> > > > > > > >
> >> > > > > > > > Thanks for the feedback. The first three points are easy:
> >> > > > > > > >
> >> > > > > > > > 1. Yes, we should be consistent.
> >> > > > > > > > 2. Yes, I will add this.
> >> > > > > > > > 3. Yes, I think we should document the changes to the
> >> committed
> >> > > > > offset
> >> > > > > > > > schema. I meant to do this, but it slipped my mind.
> >> > > > > > > >
> >> > > > > > > > The latter questions are tougher. One option I was
> >> considering
> >> > is
> >> > > > to
> >> > > > > > have
> >> > > > > > > > only `offsetsForLeaderEpochs` exposed from the consumer
> and
> >> to
> >> > > drop
> >> > > > > the
> >> > > > > > > new
> >> > > > > > > > seek() API. That seems more consistent with the current
> use
> >> of
> >> > > > > > > > `offsetsForTimes` (we don't have a separate
> >> `seekToTimestamp`
> >> > > API).
> >> > > > > An
> >> > > > > > > > alternative might be to take a page from the AdminClient
> API
> >> > and
> >> > > > add
> >> > > > > a
> >> > > > > > > new
> >> > > > > > > > method to generalize offset lookup. For example, we could
> >> have
> >> > > > > > > > `lookupOffsets(LookupOptions)`. We could then deprecate
> >> > > > > > > `offsetsForTimes`
> >> > > > > > > > and this would open the door for future extensions without
> >> > > needing
> >> > > > > new
> >> > > > > > > > APIs.
> >> > > > > > > >
> >> > > > > > > > The case of position() is a little more annoying. It would
> >> have
> >> > > > been
> >> > > > > > > better
> >> > > > > > > > had we let this return an object so that it is easier to
> >> > extend.
> >> > > > This
> >> > > > > > is
> >> > > > > > > > the only reason I didn't add the API to the KIP. Maybe we
> >> > should
> >> > > > bite
> >> > > > > > the
> >> > > > > > > > bullet and fix this now? Unfortunately we'll have to come
> up
> >> > > with a
> >> > > > > new
> >> > > > > > > > name. Maybe `currentPosition`?
> >> > > > > > > >
> >> > > > > > > > Thoughts?
> >> > > > > > > >
> >> > > > > > > > -Jason
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > > On Fri, Jun 29, 2018 at 10:40 AM, Dong Lin <
> >> > lindon...@gmail.com>
> >> > > > > > wrote:
> >> > > > > > > >
> >> > > > > > > > > Regarding points 4) and 5) above, motivation for the
> >> > > alternative
> >> > > > > APIs
> >> > > > > > > is
> >> > > > > > > > > that, if we decide that leaderEpoch is equally important
> >> as
> >> > > > offset
> >> > > > > in
> >> > > > > > > > > identifying a message, then it may be reasonable to
> always
> >> > > > specify
> >> > > > > it
> >> > > > > > > > > wherever offset is currently required in the consumer
> API
> >> to
> >> > > > > > identify a
> >> > > > > > > > > message, e.g. position(), seek(). For example, since we
> >> allow
> >> > > > user
> >> > > > > to
> >> > > > > > > > > retrieve offset using position() instead of asking user
> to
> >> > keep
> >> > > > > track
> >> > > > > > > of
> >> > > > > > > > > the offset of the latest ConsumerRecord, may be it will
> be
> >> > more
> >> > > > > > > > consistent
> >> > > > > > > > > for user to also retrieve  leaderEpoch using position()?
> >> > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > > > On Fri, Jun 29, 2018 at 10:30 AM, Dong Lin <
> >> > > lindon...@gmail.com>
> >> > > > > > > wrote:
> >> > > > > > > > >
> >> > > > > > > > > > Hey Jason,
> >> > > > > > > > > >
> >> > > > > > > > > > Thanks for the update. It looks pretty good. Just some
> >> > minor
> >> > > > > > comments
> >> > > > > > > > > > below:
> >> > > > > > > > > >
> >> > > > > > > > > > 1) The KIP adds new error code "LOG_TRUNCATION" and
> new
> >> > > > exception
> >> > > > > > > > > TruncatedPartitionException.
> >> > > > > > > > > > Can we make the name more consistent, e.g.
> >> > > > > LogTruncationException?
> >> > > > > > > > > >
> >> > > > > > > > > > 2) Do we need to add UnknownLeaderEpochException as
> >> part of
> >> > > API
> >> > > > > > > change?
> >> > > > > > > > > >
> >> > > > > > > > > > 3) Not sure if the offset topic schema is also public
> >> API.
> >> > If
> >> > > > so,
> >> > > > > > > maybe
> >> > > > > > > > > we
> >> > > > > > > > > > should also include the schema change in the API?
> >> > > > > > > > > >
> >> > > > > > > > > > 4) For users who store offset externally, currently
> they
> >> > get
> >> > > > > offset
> >> > > > > > > > using
> >> > > > > > > > > > position(..), store the offset externally, and use
> >> seek(..)
> >> > > to
> >> > > > > > > > initialize
> >> > > > > > > > > > the consumer next time. After this KIP they will need
> to
> >> > > store
> >> > > > > and
> >> > > > > > > use
> >> > > > > > > > > the
> >> > > > > > > > > > leaderEpoch together with the offset. Should we also
> >> update
> >> > > the
> >> > > > > API
> >> > > > > > > so
> >> > > > > > > > > that
> >> > > > > > > > > > user can also get leaderEpoch from position(...)? Not
> >> sure
> >> > if
> >> > > > it
> >> > > > > is
> >> > > > > > > OK
> >> > > > > > > > to
> >> > > > > > > > > > ask user to track the latest leaderEpoch of
> >> ConsumerRecord
> >> > by
> >> > > > > > > > themselves.
> >> > > > > > > > > >
> >> > > > > > > > > > 5) Also for users who store offset externally, they
> >> need to
> >> > > > call
> >> > > > > > > > seek(..)
> >> > > > > > > > > > with leaderEpoch to initialize consumer. With current
> >> KIP
> >> > > users
> >> > > > > > need
> >> > > > > > > to
> >> > > > > > > > > > call seekToNearest(), whose name suggests that the
> final
> >> > > > position
> >> > > > > > may
> >> > > > > > > > be
> >> > > > > > > > > > different from what was requested. However, if users
> may
> >> > want
> >> > > > to
> >> > > > > > > avoid
> >> > > > > > > > > auto
> >> > > > > > > > > > offset reset and be notified explicitly when there is
> >> log
> >> > > > > > truncation,
> >> > > > > > > > > then seekToNearest()
> >> > > > > > > > > > probably does not help here. Would it make sense to
> >> replace
> >> > > > > > > > > seekToNearest()
> >> > > > > > > > > > with seek(offset, leaderEpoch) + AminClient.
> >> > > > > > > > offsetsForLeaderEpochs(...)?
> >> > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > > > Thanks,
> >> > > > > > > > > > Dong
> >> > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > > > On Wed, Jun 27, 2018 at 3:57 PM, Jason Gustafson <
> >> > > > > > ja...@confluent.io
> >> > > > > > > >
> >> > > > > > > > > > wrote:
> >> > > > > > > > > >
> >> > > > > > > > > >> Hey Guozhang,
> >> > > > > > > > > >>
> >> > > > > > > > > >> That's fair. In fact, perhaps we do not need this API
> >> at
> >> > > all.
> >> > > > We
> >> > > > > > > > already
> >> > > > > > > > > >> have the new seek() in this KIP which can do the
> lookup
> >> > > based
> >> > > > on
> >> > > > > > > epoch
> >> > > > > > > > > for
> >> > > > > > > > > >> this use case. I guess we should probably call it
> >> > > > > seekToNearest()
> >> > > > > > > > though
> >> > > > > > > > > >> to
> >> > > > > > > > > >> make it clear that the final position may be
> different
> >> > from
> >> > > > what
> >> > > > > > was
> >> > > > > > > > > >> requested.
> >> > > > > > > > > >>
> >> > > > > > > > > >> Thanks,
> >> > > > > > > > > >> Jason
> >> > > > > > > > > >>
> >> > > > > > > > > >> On Wed, Jun 27, 2018 at 3:20 PM, Guozhang Wang <
> >> > > > > > wangg...@gmail.com>
> >> > > > > > > > > >> wrote:
> >> > > > > > > > > >>
> >> > > > > > > > > >> > Hi Jason,
> >> > > > > > > > > >> >
> >> > > > > > > > > >> > I think it is less worthwhile to add
> >> > > > > > > KafkaConsumer#offsetsForLeader
> >> > > > > > > > > >> Epochs,
> >> > > > > > > > > >> > since probably only very advanced users are aware
> of
> >> the
> >> > > > > > > > leaderEpoch,
> >> > > > > > > > > >> and
> >> > > > > > > > > >> > hence ever care to use it anyways. It is more like
> an
> >> > > admin
> >> > > > > > client
> >> > > > > > > > > >> > operation than a consumer client operation: if the
> >> > > > motivation
> >> > > > > is
> >> > > > > > > to
> >> > > > > > > > > >> > facility customized reset policy, maybe adding it
> as
> >> > > > > > > > > >> > AdminClient#offsetsForLeaderEpochs
> >> > > > > > > > > >> > is better as it is not an aggressive assumption
> that
> >> for
> >> > > > such
> >> > > > > > > > advanced
> >> > > > > > > > > >> > users they are willing to use some admin client to
> >> get
> >> > > > further
> >> > > > > > > > > >> information?
> >> > > > > > > > > >> >
> >> > > > > > > > > >> >
> >> > > > > > > > > >> > Guozhang
> >> > > > > > > > > >> >
> >> > > > > > > > > >> >
> >> > > > > > > > > >> > On Wed, Jun 27, 2018 at 2:07 PM, Jason Gustafson <
> >> > > > > > > > ja...@confluent.io>
> >> > > > > > > > > >> > wrote:
> >> > > > > > > > > >> >
> >> > > > > > > > > >> > > Thanks for the feedback. I've updated the KIP.
> >> > > > Specifically
> >> > > > > I
> >> > > > > > > > > removed
> >> > > > > > > > > >> the
> >> > > > > > > > > >> > > "closest" reset option and the proposal to reset
> by
> >> > > > > timestamp
> >> > > > > > > when
> >> > > > > > > > > the
> >> > > > > > > > > >> > > precise truncation point cannot be determined.
> >> > Instead,
> >> > > I
> >> > > > > > > proposed
> >> > > > > > > > > >> that
> >> > > > > > > > > >> > we
> >> > > > > > > > > >> > > always reset using the nearest epoch when a reset
> >> > policy
> >> > > > is
> >> > > > > > > > defined
> >> > > > > > > > > >> > (either
> >> > > > > > > > > >> > > "earliest" or "latest"). Does that sound
> >> reasonable?
> >> > > > > > > > > >> > >
> >> > > > > > > > > >> > > One thing I am still debating is whether it would
> >> be
> >> > > > better
> >> > > > > to
> >> > > > > > > > have
> >> > > > > > > > > a
> >> > > > > > > > > >> > > separate API to find the closest offset using the
> >> > leader
> >> > > > > > epoch.
> >> > > > > > > In
> >> > > > > > > > > the
> >> > > > > > > > > >> > > current KIP, I suggested to piggyback this
> >> information
> >> > > on
> >> > > > an
> >> > > > > > > > > >> exception,
> >> > > > > > > > > >> > but
> >> > > > > > > > > >> > > I'm beginning to think it would be better not to
> >> hide
> >> > > the
> >> > > > > > > lookup.
> >> > > > > > > > It
> >> > > > > > > > > >> is
> >> > > > > > > > > >> > > awkward to implement since it means delaying the
> >> > > exception
> >> > > > > and
> >> > > > > > > the
> >> > > > > > > > > API
> >> > > > > > > > > >> > may
> >> > > > > > > > > >> > > actually be useful when customizing reset logic
> if
> >> no
> >> > > auto
> >> > > > > > reset
> >> > > > > > > > > >> policy
> >> > > > > > > > > >> > is
> >> > > > > > > > > >> > > defined. I was thinking we can add an API like
> the
> >> > > > > following:
> >> > > > > > > > > >> > >
> >> > > > > > > > > >> > > Map<TopicPartition, OffsetAndEpoch>
> >> > > > > > > > > >> > > offsetsForLeaderEpochs(Map<TopicPartition,
> Integer>
> >> > > > > > > > epochsToSearch)
> >> > > > > > > > > >> > >
> >> > > > > > > > > >> > > Thoughts?
> >> > > > > > > > > >> > >
> >> > > > > > > > > >> > > -Jason
> >> > > > > > > > > >> > >
> >> > > > > > > > > >> > >
> >> > > > > > > > > >> > >
> >> > > > > > > > > >> > >
> >> > > > > > > > > >> > > On Wed, Jun 27, 2018 at 11:12 AM, Jason
> Gustafson <
> >> > > > > > > > > ja...@confluent.io
> >> > > > > > > > > >> >
> >> > > > > > > > > >> > > wrote:
> >> > > > > > > > > >> > >
> >> > > > > > > > > >> > > > @Dong
> >> > > > > > > > > >> > > >
> >> > > > > > > > > >> > > > Those are fair points. Both approaches require
> >> some
> >> > > > > > fuzziness
> >> > > > > > > to
> >> > > > > > > > > >> reset
> >> > > > > > > > > >> > > the
> >> > > > > > > > > >> > > > offset in these pathological scenarios and we
> >> cannot
> >> > > > > > guarantee
> >> > > > > > > > > >> > > > at-least-once delivery either way unless we
> have
> >> the
> >> > > > full
> >> > > > > > > > history
> >> > > > > > > > > of
> >> > > > > > > > > >> > > leader
> >> > > > > > > > > >> > > > epochs that were consumed. The KIP-101 logic
> may
> >> > > > actually
> >> > > > > be
> >> > > > > > > > more
> >> > > > > > > > > >> > > accurate
> >> > > > > > > > > >> > > > than using timestamps because it does not
> depend
> >> on
> >> > > the
> >> > > > > > > messages
> >> > > > > > > > > >> which
> >> > > > > > > > > >> > > are
> >> > > > > > > > > >> > > > written after the unclean leader election. The
> >> case
> >> > > > we're
> >> > > > > > > > talking
> >> > > > > > > > > >> about
> >> > > > > > > > > >> > > > should be extremely rare in practice anyway. I
> >> also
> >> > > > agree
> >> > > > > > that
> >> > > > > > > > we
> >> > > > > > > > > >> may
> >> > > > > > > > > >> > not
> >> > > > > > > > > >> > > > want to add new machinery if it only helps the
> >> old
> >> > > > message
> >> > > > > > > > format.
> >> > > > > > > > > >> Ok,
> >> > > > > > > > > >> > > > let's go ahead and drop the timestamp.
> >> > > > > > > > > >> > > >
> >> > > > > > > > > >> > > > @Guozhang
> >> > > > > > > > > >> > > >
> >> > > > > > > > > >> > > > * My current understanding is that, with
> unclean
> >> > > leader
> >> > > > > > > election
> >> > > > > > > > > >> turned
> >> > > > > > > > > >> > > on,
> >> > > > > > > > > >> > > >> exactly-once is out of the window since we
> >> cannot
> >> > > > > guarantee
> >> > > > > > > > that
> >> > > > > > > > > >> all
> >> > > > > > > > > >> > > >> committed message markers will not be lost.
> And
> >> > hence
> >> > > > > there
> >> > > > > > > is
> >> > > > > > > > no
> >> > > > > > > > > >> need
> >> > > > > > > > > >> > > to
> >> > > > > > > > > >> > > >> have special handling logic for LOG_TRUNCATED
> or
> >> > OOR
> >> > > > > error
> >> > > > > > > > codes
> >> > > > > > > > > >> with
> >> > > > > > > > > >> > > >> read.committed turned on. Is that right?
> >> > > > > > > > > >> > > >
> >> > > > > > > > > >> > > >
> >> > > > > > > > > >> > > > Yes, that's right. EoS and unclean leader
> >> election
> >> > > don't
> >> > > > > mix
> >> > > > > > > > well.
> >> > > > > > > > > >> It
> >> > > > > > > > > >> > may
> >> > > > > > > > > >> > > > be worth considering separately whether we
> should
> >> > try
> >> > > to
> >> > > > > > > > reconcile
> >> > > > > > > > > >> the
> >> > > > > > > > > >> > > > transaction log following an unclean leader
> >> > election.
> >> > > At
> >> > > > > > least
> >> > > > > > > > we
> >> > > > > > > > > >> may
> >> > > > > > > > > >> > be
> >> > > > > > > > > >> > > > able to prevent dangling transactions from
> >> blocking
> >> > > > > > consumers.
> >> > > > > > > > > This
> >> > > > > > > > > >> KIP
> >> > > > > > > > > >> > > > does not address this problem.
> >> > > > > > > > > >> > > >
> >> > > > > > > > > >> > > > * MINOR: "if the epoch is greater than the
> >> minimum
> >> > > > > expected
> >> > > > > > > > epoch,
> >> > > > > > > > > >> that
> >> > > > > > > > > >> > > the
> >> > > > > > > > > >> > > >> new epoch does not begin at an earlier offset
> >> than
> >> > > the
> >> > > > > > fetch
> >> > > > > > > > > >> offset.
> >> > > > > > > > > >> > In
> >> > > > > > > > > >> > > >> the latter case, the leader can respond with a
> >> new
> >> > > > > > > > LOG_TRUNCATION
> >> > > > > > > > > >> > error
> >> > > > > > > > > >> > > >> code" should it be "does not begin at a later
> >> > offset
> >> > > > than
> >> > > > > > the
> >> > > > > > > > > fetch
> >> > > > > > > > > >> > > >> offset"?
> >> > > > > > > > > >> > > >
> >> > > > > > > > > >> > > >
> >> > > > > > > > > >> > > > I think the comment is correct, though the
> >> phrasing
> >> > > may
> >> > > > be
> >> > > > > > > > > >> confusing.
> >> > > > > > > > > >> > We
> >> > > > > > > > > >> > > > know truncation has occurred if there exists a
> >> > larger
> >> > > > > epoch
> >> > > > > > > > with a
> >> > > > > > > > > >> > > starting
> >> > > > > > > > > >> > > > offset that is lower than the fetch offset. Let
> >> me
> >> > try
> >> > > > to
> >> > > > > > > > rephrase
> >> > > > > > > > > >> > this.
> >> > > > > > > > > >> > > >
> >> > > > > > > > > >> > > > Thanks,
> >> > > > > > > > > >> > > > Jason
> >> > > > > > > > > >> > > >
> >> > > > > > > > > >> > > > On Wed, Jun 27, 2018 at 9:17 AM, Guozhang Wang
> <
> >> > > > > > > > > wangg...@gmail.com>
> >> > > > > > > > > >> > > wrote:
> >> > > > > > > > > >> > > >
> >> > > > > > > > > >> > > >> Jason, thanks for the KIP. A few comments:
> >> > > > > > > > > >> > > >>
> >> > > > > > > > > >> > > >> * I think Dong's question about whether to use
> >> > > > > > > timestamp-based
> >> > > > > > > > > >> > approach
> >> > > > > > > > > >> > > >> v.s. start-offset-of-first-larger-epoch is
> >> valid;
> >> > > more
> >> > > > > > > > > >> specifically,
> >> > > > > > > > > >> > > with
> >> > > > > > > > > >> > > >> timestamp-based approach we may still be
> >> reseting
> >> > to
> >> > > an
> >> > > > > > > offset
> >> > > > > > > > > >> falling
> >> > > > > > > > > >> > > >> into
> >> > > > > > > > > >> > > >> the truncated interval, and hence we may still
> >> miss
> >> > > > some
> >> > > > > > > data,
> >> > > > > > > > > i.e.
> >> > > > > > > > > >> > not
> >> > > > > > > > > >> > > >> guaranteeing at-least-once still. With the
> >> > > > > > > > > >> > > >> start-offset-of-first-larger-epoch, I'm not
> sure
> >> > if
> >> > > it
> >> > > > > > will
> >> > > > > > > > > >> guarantee
> >> > > > > > > > > >> > > no
> >> > > > > > > > > >> > > >> valid data is missed when we have consecutive
> >> log
> >> > > > > > truncations
> >> > > > > > > > > >> (maybe
> >> > > > > > > > > >> > we
> >> > > > > > > > > >> > > >> need to look back into details of KIP-101 to
> >> figure
> >> > > it
> >> > > > > > out).
> >> > > > > > > If
> >> > > > > > > > > the
> >> > > > > > > > > >> > > latter
> >> > > > > > > > > >> > > >> can indeed guarantee at least once, we could
> >> > consider
> >> > > > > using
> >> > > > > > > > that
> >> > > > > > > > > >> > > approach.
> >> > > > > > > > > >> > > >>
> >> > > > > > > > > >> > > >> * My current understanding is that, with
> unclean
> >> > > leader
> >> > > > > > > > election
> >> > > > > > > > > >> > turned
> >> > > > > > > > > >> > > >> on,
> >> > > > > > > > > >> > > >> exactly-once is out of the window since we
> >> cannot
> >> > > > > guarantee
> >> > > > > > > > that
> >> > > > > > > > > >> all
> >> > > > > > > > > >> > > >> committed message markers will not be lost.
> And
> >> > hence
> >> > > > > there
> >> > > > > > > is
> >> > > > > > > > no
> >> > > > > > > > > >> need
> >> > > > > > > > > >> > > to
> >> > > > > > > > > >> > > >> have special handling logic for LOG_TRUNCATED
> or
> >> > OOR
> >> > > > > error
> >> > > > > > > > codes
> >> > > > > > > > > >> with
> >> > > > > > > > > >> > > >> read.committed turned on. Is that right?
> >> > > > > > > > > >> > > >>
> >> > > > > > > > > >> > > >> * MINOR: "if the epoch is greater than the
> >> minimum
> >> > > > > expected
> >> > > > > > > > > epoch,
> >> > > > > > > > > >> > that
> >> > > > > > > > > >> > > >> the
> >> > > > > > > > > >> > > >> new epoch does not begin at an earlier offset
> >> than
> >> > > the
> >> > > > > > fetch
> >> > > > > > > > > >> offset.
> >> > > > > > > > > >> > In
> >> > > > > > > > > >> > > >> the latter case, the leader can respond with a
> >> new
> >> > > > > > > > LOG_TRUNCATION
> >> > > > > > > > > >> > error
> >> > > > > > > > > >> > > >> code" should it be "does not begin at a later
> >> > offset
> >> > > > than
> >> > > > > > the
> >> > > > > > > > > fetch
> >> > > > > > > > > >> > > >> offset"?
> >> > > > > > > > > >> > > >>
> >> > > > > > > > > >> > > >>
> >> > > > > > > > > >> > > >>
> >> > > > > > > > > >> > > >> Guozhang
> >> > > > > > > > > >> > > >>
> >> > > > > > > > > >> > > >>
> >> > > > > > > > > >> > > >> On Tue, Jun 26, 2018 at 6:51 PM, Dong Lin <
> >> > > > > > > lindon...@gmail.com
> >> > > > > > > > >
> >> > > > > > > > > >> > wrote:
> >> > > > > > > > > >> > > >>
> >> > > > > > > > > >> > > >> > Hey Jason,
> >> > > > > > > > > >> > > >> >
> >> > > > > > > > > >> > > >> > Thanks for the explanation.
> >> > > > > > > > > >> > > >> >
> >> > > > > > > > > >> > > >> > Please correct me if this is wrong. The
> >> "unknown
> >> > > > > > truncation
> >> > > > > > > > > >> offset"
> >> > > > > > > > > >> > > >> > scenario happens when consumer does not have
> >> the
> >> > > full
> >> > > > > > > > > >> leaderEpoch ->
> >> > > > > > > > > >> > > >> offset
> >> > > > > > > > > >> > > >> > mapping. In this case we can still use the
> >> > > > > KIP-101-based
> >> > > > > > > > > >> approach to
> >> > > > > > > > > >> > > >> > truncate offset to "start offset of the
> first
> >> > > Leader
> >> > > > > > Epoch
> >> > > > > > > > > larger
> >> > > > > > > > > >> > than
> >> > > > > > > > > >> > > >> last
> >> > > > > > > > > >> > > >> > epoch of the consumer" but it may be
> >> inaccurate.
> >> > So
> >> > > > the
> >> > > > > > KIP
> >> > > > > > > > > >> chooses
> >> > > > > > > > > >> > to
> >> > > > > > > > > >> > > >> use
> >> > > > > > > > > >> > > >> > the timestamp-based approach which is also
> >> > > > best-effort.
> >> > > > > > > > > >> > > >> >
> >> > > > > > > > > >> > > >> > If this understanding is correct, for
> >> "closest"
> >> > > > offset
> >> > > > > > > reset
> >> > > > > > > > > >> policy
> >> > > > > > > > > >> > > and
> >> > > > > > > > > >> > > >> > "unknown truncation offset" scenario, I am
> >> > > wondering
> >> > > > > > > whether
> >> > > > > > > > it
> >> > > > > > > > > >> > maybe
> >> > > > > > > > > >> > > >> > better to replace timestamp-based approach
> >> with
> >> > > > KIP-101
> >> > > > > > > based
> >> > > > > > > > > >> > > approach.
> >> > > > > > > > > >> > > >> In
> >> > > > > > > > > >> > > >> > comparison to timestamp-based approach, the
> >> > > > > KIP-101-based
> >> > > > > > > > > >> approach
> >> > > > > > > > > >> > > >> seems to
> >> > > > > > > > > >> > > >> > simplify the API a bit since user does not
> >> need
> >> > to
> >> > > > > > > understand
> >> > > > > > > > > >> > > timestamp.
> >> > > > > > > > > >> > > >> > Similar to the timestamp-based approach,
> both
> >> > > > > approaches
> >> > > > > > > are
> >> > > > > > > > > >> > > best-effort
> >> > > > > > > > > >> > > >> > and do not guarantee that consumer can
> consume
> >> > all
> >> > > > > > > messages.
> >> > > > > > > > It
> >> > > > > > > > > >> is
> >> > > > > > > > > >> > not
> >> > > > > > > > > >> > > >> like
> >> > > > > > > > > >> > > >> > KIP-279 which guarantees that follower
> broker
> >> can
> >> > > > > consume
> >> > > > > > > all
> >> > > > > > > > > >> > messages
> >> > > > > > > > > >> > > >> from
> >> > > > > > > > > >> > > >> > the leader.
> >> > > > > > > > > >> > > >> >
> >> > > > > > > > > >> > > >> > Then it seems that the remaining difference
> is
> >> > > mostly
> >> > > > > > about
> >> > > > > > > > > >> > accuracy,
> >> > > > > > > > > >> > > >> i.e.
> >> > > > > > > > > >> > > >> > how much message will be duplicated or
> missed
> >> in
> >> > > the
> >> > > > > > > "unknown
> >> > > > > > > > > >> > > truncation
> >> > > > > > > > > >> > > >> > offset" scenario. Not sure either one is
> >> clearly
> >> > > > better
> >> > > > > > > than
> >> > > > > > > > > the
> >> > > > > > > > > >> > > other.
> >> > > > > > > > > >> > > >> > Note that there are two scenarios mentioned
> in
> >> > > > KIP-279
> >> > > > > > > which
> >> > > > > > > > > are
> >> > > > > > > > > >> not
> >> > > > > > > > > >> > > >> > addressed by KIP-101. Both scenarios require
> >> > quick
> >> > > > > > > leadership
> >> > > > > > > > > >> change
> >> > > > > > > > > >> > > >> > between brokers, which seems to suggest that
> >> the
> >> > > > offset
> >> > > > > > > based
> >> > > > > > > > > >> > obtained
> >> > > > > > > > > >> > > >> > by "start
> >> > > > > > > > > >> > > >> > offset of the first Leader Epoch larger than
> >> last
> >> > > > epoch
> >> > > > > > of
> >> > > > > > > > the
> >> > > > > > > > > >> > > consumer"
> >> > > > > > > > > >> > > >> > under these two scenarios may be very close
> to
> >> > the
> >> > > > > offset
> >> > > > > > > > > >> obtained
> >> > > > > > > > > >> > by
> >> > > > > > > > > >> > > >> the
> >> > > > > > > > > >> > > >> > message timestamp. Does this sound
> reasonable?
> >> > > > > > > > > >> > > >> >
> >> > > > > > > > > >> > > >> > Good point that users on v1 format can get
> >> > benefit
> >> > > > with
> >> > > > > > > > > timestamp
> >> > > > > > > > > >> > > based
> >> > > > > > > > > >> > > >> > approach. On the other hand it seems like a
> >> short
> >> > > > term
> >> > > > > > > > benefit
> >> > > > > > > > > >> for
> >> > > > > > > > > >> > > users
> >> > > > > > > > > >> > > >> > who have not migrated. I am just not sure
> >> whether
> >> > > it
> >> > > > is
> >> > > > > > > more
> >> > > > > > > > > >> > important
> >> > > > > > > > > >> > > >> than
> >> > > > > > > > > >> > > >> > designing a better API.
> >> > > > > > > > > >> > > >> >
> >> > > > > > > > > >> > > >> > Also, for both "latest" and "earliest" reset
> >> > > policy,
> >> > > > do
> >> > > > > > you
> >> > > > > > > > > >> think it
> >> > > > > > > > > >> > > >> would
> >> > > > > > > > > >> > > >> > make sense to also use the KIP-101 based
> >> approach
> >> > > to
> >> > > > > > > truncate
> >> > > > > > > > > >> offset
> >> > > > > > > > > >> > > for
> >> > > > > > > > > >> > > >> > the "unknown truncation offset" scenario?
> >> > > > > > > > > >> > > >> >
> >> > > > > > > > > >> > > >> >
> >> > > > > > > > > >> > > >> > Thanks,
> >> > > > > > > > > >> > > >> > Dong
> >> > > > > > > > > >> > > >> >
> >> > > > > > > > > >> > > >>
> >> > > > > > > > > >> > > >>
> >> > > > > > > > > >> > > >>
> >> > > > > > > > > >> > > >> --
> >> > > > > > > > > >> > > >> -- Guozhang
> >> > > > > > > > > >> > > >>
> >> > > > > > > > > >> > > >
> >> > > > > > > > > >> > > >
> >> > > > > > > > > >> > >
> >> > > > > > > > > >> >
> >> > > > > > > > > >> >
> >> > > > > > > > > >> >
> >> > > > > > > > > >> > --
> >> > > > > > > > > >> > -- Guozhang
> >> > > > > > > > > >> >
> >> > > > > > > > > >>
> >> > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >
>

Reply via email to