Hey Jason,

It is a great summary. The solution sounds good. I might have minor
comments regarding the method name. But we can discuss that minor points
later after we reach consensus on the high level API.

Thanks,
Dong


On Thu, Jul 12, 2018 at 11:41 AM, Jason Gustafson <ja...@confluent.io>
wrote:

> Hey Anna and Dong,
>
> Thanks a lot for the great discussion. I've been hanging back a bit because
> honestly the best option hasn't seemed clear. I agree with Anna's general
> observation that there is a distinction between the position of the
> consumer and its fetch state up to that position. If you think about it, a
> committed offset actually represents both of these. The metadata is used to
> initialize the state of the consumer application and the offset initializes
> the position. Additionally, we are extending the offset commit in this KIP
> to also include the last epoch fetched by the consumer, which is used to
> initialize the internal fetch state. Of course if you do an arbitrary
> `seek` and immediately commit offsets, then there won't be a last epoch to
> commit. This seems intuitive since there is no fetch state in this case. We
> only commit fetch state when we have it.
>
> So if we think about a committed offset as initializing both the consumer's
> position and its fetch state, then the gap in the API is evidently that we
> don't have a way to initialize the consumer to a committed offset. We do it
> implicitly of course for offsets stored in Kafka, but since external
> storage is a use case we support, then we should have an explicit API as
> well. Perhaps something like this:
>
> seekToCommitted(TopicPartition, OffsetAndMetadata)
>
> In this KIP, we are proposing to allow the `OffsetAndMetadata` object to
> include the leader epoch, so I think this would have the same effect as
> Anna's suggested `seekToRecord`. But perhaps it is a more natural fit given
> the current API? Furthermore, if we find a need for additional metadata in
> the offset commit API in the future, then we will just need to modify the
> `OffsetAndMetadata` object and we will not need a new `seek` API.
>
> With this approach, I think then we can leave the `position` API as it is.
> The position of the consumer is still just the next expected fetch offset.
> If a user needs to record additional state based on previous fetch
> progress, then they would use the result of the previous fetch to obtain
> it. This makes the dependence on fetch progress explicit. I think we could
> make this a little more convenience with a helper in the `ConsumerRecords`
> object, but I think that's more of a nice-to-have.
>
> Thoughts?
>
> By the way, I have been iterating a little bit on the replica side of this
> KIP. My initial proposal in fact did not have strong enough fencing to
> protect all of the edge cases. I believe the current proposal fixes the
> problems, but I am still verifying the model.
>
> Thanks,
> Jason
>
>
> On Wed, Jul 11, 2018 at 10:45 AM, Dong Lin <lindon...@gmail.com> wrote:
>
> > Hey Anna,
> >
> > Thanks much for the explanation. Approach 1 also sounds good to me. I
> think
> > findOffsets() is useful for users who don't use automatic offset reset
> > policy.
> >
> > Just one more question. Since users who store offsets externally need to
> > provide leaderEpoch to findOffsets(...), do we need an extra API for user
> > to get both offset and leaderEpoch, e.g. recordPosition()?
> >
> > Thanks,
> > Dong
> >
> > On Wed, Jul 11, 2018 at 10:12 AM, Anna Povzner <a...@confluent.io>
> wrote:
> >
> > > Hi Dong,
> > >
> > >
> > > What I called “not covering all use cases” is what you call best-effort
> > > (not guaranteeing some corner cases). I think we are on the same page
> > here.
> > >
> > >
> > > I wanted to be clear in the API whether the consumer seeks to a
> position
> > > (offset) or to a record (offset, leader epoch). The only use-case of
> > > seeking to a record is seeking to a committed offset for a user who
> > stores
> > > committed offsets externally. (Unless users find some other reason to
> > seek
> > > to a record.) I thought it was possible to provide this functionality
> > with
> > > findOffset(offset, leader epoch) followed by a seek(offset). However,
> you
> > > are right that this will not handle the race condition where
> > non-divergent
> > > offset found by findOffset() could change again before the consumer
> does
> > > the first fetch.
> > >
> > >
> > > Regarding position() — if we add position that returns (offset, leader
> > > epoch), this is specifically a position after a record that was
> actually
> > > consumed or position of a committed record. In which case, I still
> think
> > > it’s cleaner to get a record position of consumed message from a new
> > helper
> > > method in ConsumerRecords() or from committed offsets.
> > >
> > >
> > > I think all the use-cases could be then covered with:
> > >
> > > (Approach 1)
> > >
> > > seekToRecord(offset, leaderEpoch) — this will just initialize/set the
> > > consumer state;
> > >
> > > findOffsets(offset, leaderEpoch) returns {offset, leaderEpoch}
> > >
> > >
> > > If we agree that the race condition is also a corner case, then I think
> > we
> > > can cover use-cases with:
> > >
> > > (Approach 2)
> > >
> > > findOffsets(offset, leaderEpoch) returns offset — we still want leader
> > > epoch as a parameter for the users who store their committed offsets
> > > externally.
> > >
> > >
> > > I am actually now leaning more to approach 1, since it is more
> explicit,
> > > and maybe there are more use cases for it.
> > >
> > >
> > > Thanks,
> > >
> > > Anna
> > >
> > >
> > > On Tue, Jul 10, 2018 at 3:47 PM Dong Lin <lindon...@gmail.com> wrote:
> > >
> > > > Hey Anna,
> > > >
> > > > Thanks for the comment. To answer your question, it seems that we can
> > > cover
> > > > all case in this KIP. As stated in "Consumer Handling" section,
> KIP-101
> > > > based approach will be used to derive the truncation offset from the
> > > > 2-tuple (offset, leaderEpoch). This approach is best effort and it is
> > > > inaccurate only in very rare scenarios (as described in KIP-279).
> > > >
> > > > By using seek(offset, leaderEpoch), consumer will still be able to
> > follow
> > > > this best-effort approach to detect log truncation and determine the
> > > > truncation offset. On the other hand, if we use seek(offset),
> consumer
> > > will
> > > > not detect log truncation in some cases which weakens the guarantee
> of
> > > this
> > > > KIP. Does this make sense?
> > > >
> > > > Thanks,
> > > > Dong
> > > >
> > > > On Tue, Jul 10, 2018 at 3:14 PM, Anna Povzner <a...@confluent.io>
> > wrote:
> > > >
> > > > > Sorry, I hit "send" before finishing. Continuing...
> > > > >
> > > > >
> > > > > 2) Hiding most of the consumer handling log truncation logic with
> > > minimal
> > > > > exposure in KafkaConsumer API.  I was proposing this path.
> > > > >
> > > > >
> > > > > Before answering your specific questions… I want to answer to your
> > > > comment
> > > > > “In general, maybe we should discuss the final solution that covers
> > all
> > > > > cases?”. With current KIP, we don’t cover all cases of consumer
> > > detecting
> > > > > log truncation because the KIP proposes a leader epoch cache in
> > > consumer
> > > > > that does not persist across restarts. Plus, we only store last
> > > committed
> > > > > offset (either internally or users can store externally). This has
> a
> > > > > limitation that the consumer will not always be able to find point
> of
> > > > > truncation just because we have a limited history (just one data
> > > point).
> > > > >
> > > > >
> > > > > So, maybe we should first agree on whether we accept that storing
> > last
> > > > > committed offset/leader epoch has a limitation that the consumer
> will
> > > not
> > > > > be able to detect log truncation in all cases?
> > > > >
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Anna
> > > > >
> > > > > On Tue, Jul 10, 2018 at 2:20 PM Anna Povzner <a...@confluent.io>
> > > wrote:
> > > > >
> > > > > > Hi Dong,
> > > > > >
> > > > > > Thanks for the follow up! I finally have much more clear
> > > understanding
> > > > of
> > > > > > where you are coming from.
> > > > > >
> > > > > > You are right. The success of findOffsets()/finding a point of
> > > > > > non-divergence depends on whether we have enough entries in the
> > > > > consumer's
> > > > > > leader epoch cache. However, I think this is a fundamental
> > limitation
> > > > of
> > > > > > having a leader epoch cache that does not persist across consumer
> > > > > restarts.
> > > > > >
> > > > > > If we consider the general case where consumer may or may not
> have
> > > this
> > > > > > cache, then I see two paths:
> > > > > > 1) Letting the user to track the leader epoch history externally,
> > and
> > > > > have
> > > > > > more exposure to leader epoch and finding point of non-divergence
> > in
> > > > > > KafkaConsumer API. I understand this is the case you were talking
> > > > about.
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Tue, Jul 10, 2018 at 12:16 PM Dong Lin <lindon...@gmail.com>
> > > wrote:
> > > > > >
> > > > > >> Hey Anna,
> > > > > >>
> > > > > >> Thanks much for your detailed explanation and example! It does
> > help
> > > me
> > > > > >> understand the difference between our understanding.
> > > > > >>
> > > > > >> So it seems that the solution based on findOffsets() currently
> > > focuses
> > > > > >> mainly on the scenario that consumer has cached leaderEpoch ->
> > > offset
> > > > > >> mapping whereas I was thinking about the general case where
> > consumer
> > > > may
> > > > > >> or
> > > > > >> may not have this cache. I guess that is why we have different
> > > > > >> understanding here. I have some comments below.
> > > > > >>
> > > > > >>
> > > > > >> 3) The proposed solution using findOffsets(offset, leaderEpoch)
> > > > followed
> > > > > >> by
> > > > > >> seek(offset) works if consumer has the cached leaderEpoch ->
> > offset
> > > > > >> mapping. But if we assume consumer has this cache, do we need to
> > > have
> > > > > >> leaderEpoch in the findOffsets(...)? Intuitively, the
> > > > > findOffsets(offset)
> > > > > >> can also derive the leaderEpoch using offset just like the
> > proposed
> > > > > >> solution does with seek(offset).
> > > > > >>
> > > > > >>
> > > > > >> 4) If consumer does not have cached leaderEpoch -> offset
> mapping,
> > > > which
> > > > > >> is
> > > > > >> the case if consumer is restarted on a new machine, then it is
> not
> > > > clear
> > > > > >> what leaderEpoch would be included in the FetchRequest if
> consumer
> > > > does
> > > > > >> seek(offset). This is the case that motivates the first question
> > of
> > > > the
> > > > > >> previous email. In general, maybe we should discuss the final
> > > solution
> > > > > >> that
> > > > > >> covers all cases?
> > > > > >>
> > > > > >>
> > > > > >> 5) The second question in my previous email is related to the
> > > > following
> > > > > >> paragraph:
> > > > > >>
> > > > > >> "... In some cases, offsets returned from position() could be
> > actual
> > > > > >> consumed messages by this consumer identified by {offset, leader
> > > > epoch}.
> > > > > >> In
> > > > > >> other cases, position() returns offset that was not actually
> > > consumed.
> > > > > >> Suppose, the user calls position() for the last offset...".
> > > > > >>
> > > > > >> I guess my point is that, if user calls position() for the last
> > > offset
> > > > > and
> > > > > >> uses that offset in seek(...), then user can probably just call
> > > > > >> Consumer#seekToEnd() without calling position() and seek(...).
> > > > Similarly
> > > > > >> user can call Consumer#seekToBeginning() to the seek to the
> > earliest
> > > > > >> position without calling position() and seek(...). Thus
> position()
> > > > only
> > > > > >> needs to return the actual consumed messages identified by
> > {offset,
> > > > > leader
> > > > > >> epoch}. Does this make sense?
> > > > > >>
> > > > > >>
> > > > > >> Thanks,
> > > > > >> Dong
> > > > > >>
> > > > > >>
> > > > > >> On Mon, Jul 9, 2018 at 6:47 PM, Anna Povzner <a...@confluent.io
> >
> > > > wrote:
> > > > > >>
> > > > > >> > Hi Dong,
> > > > > >> >
> > > > > >> >
> > > > > >> > Thanks for considering my suggestions.
> > > > > >> >
> > > > > >> >
> > > > > >> > Based on your comments, I realized that my suggestion was not
> > > > complete
> > > > > >> with
> > > > > >> > regard to KafkaConsumer API vs. consumer-broker protocol.
> While
> > I
> > > > > >> propose
> > > > > >> > to keep KafkaConsumer#seek() unchanged and take offset only,
> the
> > > > > >> underlying
> > > > > >> > consumer will send the next FetchRequest() to broker with
> offset
> > > and
> > > > > >> > leaderEpoch if it is known (based on leader epoch cache in
> > > > consumer) —
> > > > > >> note
> > > > > >> > that this is different from the current KIP, which suggests to
> > > > always
> > > > > >> send
> > > > > >> > unknown leader epoch after seek(). This way, if the consumer
> > and a
> > > > > >> broker
> > > > > >> > agreed on the point of non-divergence, which is some {offset,
> > > > > >> leaderEpoch}
> > > > > >> > pair, the new leader which causes another truncation (even
> > further
> > > > > back)
> > > > > >> > will be able to detect new divergence and restart the process
> of
> > > > > finding
> > > > > >> > the new point of non-divergence. So, to answer your question,
> If
> > > the
> > > > > >> > truncation happens just after the user calls
> > > > > >> > KafkaConsumer#findOffsets(offset, leaderEpoch) followed by
> > > > > seek(offset),
> > > > > >> > the user will not seek to the wrong position without knowing
> > that
> > > > > >> > truncation has happened, because the consumer will get another
> > > > > >> truncation
> > > > > >> > error, and seek again.
> > > > > >> >
> > > > > >> >
> > > > > >> > I am afraid, I did not understand your second question. Let me
> > > > > >> summarize my
> > > > > >> > suggestions again, and then give an example to hopefully make
> my
> > > > > >> > suggestions more clear. Also, the last part of my example
> shows
> > > how
> > > > > the
> > > > > >> > use-case in your first question will work. If it does not
> answer
> > > > your
> > > > > >> > second question, would you mind clarifying? I am also focusing
> > on
> > > > the
> > > > > >> case
> > > > > >> > of a consumer having enough entries in the cache. The case of
> > > > > restarting
> > > > > >> > from committed offset either stored externally or internally
> > will
> > > > > >> probably
> > > > > >> > need to be discussed more.
> > > > > >> >
> > > > > >> >
> > > > > >> > Let me summarize my suggestion again:
> > > > > >> >
> > > > > >> > 1) KafkaConsumer#seek() and KafkaConsumer#position() remains
> > > > unchanged
> > > > > >> >
> > > > > >> > 2) New KafkaConsumer#findOffsets() takes {offset, leaderEpoch}
> > > pair
> > > > > per
> > > > > >> > topic partition and returns offset per topic partition.
> > > > > >> >
> > > > > >> > 3) FetchRequest() to broker after KafkaConsumer#seek() will
> > > contain
> > > > > the
> > > > > >> > offset set by seek and leaderEpoch that corresponds to the
> > offset
> > > > > based
> > > > > >> on
> > > > > >> > leader epoch cache in the consumer.
> > > > > >> >
> > > > > >> >
> > > > > >> > The rest of this e-mail is a long and contrived example with
> > > several
> > > > > log
> > > > > >> > truncations and unclean leader elections to illustrate the API
> > and
> > > > > your
> > > > > >> > first use-case. Suppose we have three brokers. Initially,
> Broker
> > > A,
> > > > B,
> > > > > >> and
> > > > > >> > C has one message at offset 0 with leader epoch 0. Then,
> Broker
> > A
> > > > goes
> > > > > >> down
> > > > > >> > for some time. Broker B becomes a leader with epoch 1, and
> > writes
> > > > > >> messages
> > > > > >> > to offsets 1 and 2. Broker C fetches offset 1, but before
> > fetching
> > > > > >> offset
> > > > > >> > 2, becomes a leader with leader epoch 2 and writes a message
> at
> > > > offset
> > > > > >> 2.
> > > > > >> > Here is the state of brokers at this point:
> > > > > >> >
> > > > > >> > > Broker A:
> > > > > >> > > offset 0, epoch 0 <— leader
> > > > > >> > > goes down…
> > > > > >> >
> > > > > >> >
> > > > > >> > > Broker B:
> > > > > >> > > offset 0, epoch 0
> > > > > >> > > offset 1, epoch 1  <- leader
> > > > > >> > > offset 2, epoch 1
> > > > > >> >
> > > > > >> >
> > > > > >> >
> > > > > >> > Broker C:
> > > > > >> > > offset 0, epoch 0
> > > > > >> > > offset 1, epoch 1
> > > > > >> > > offset 2, epoch 2 <— leader
> > > > > >> >
> > > > > >> >
> > > > > >> > Before Broker C becomes a leader with leader epoch 2, the
> > consumer
> > > > > >> consumed
> > > > > >> > the following messages from broker A and broker B:
> > > > > >> >
> > > > > >> > {offset=0, leaderEpoch=0}, {offset=1, leaderEpoch=1},
> {offset=2,
> > > > > >> > leaderEpoch=1}.
> > > > > >> >
> > > > > >> > Consumer’s leader epoch cache at this point contains the
> > following
> > > > > >> entries:
> > > > > >> >
> > > > > >> > (leaderEpoch=0, startOffset=0)
> > > > > >> >
> > > > > >> > (leaderEpoch=1, startOffset=1)
> > > > > >> >
> > > > > >> > endOffset = 3
> > > > > >> >
> > > > > >> >
> > > > > >> > Then, broker B becomes the follower of broker C, truncates and
> > > > starts
> > > > > >> > fetching from offset 2.
> > > > > >> >
> > > > > >> > Consumer sends fetchRequest(offset=3, leaderEpoch=1) and gets
> > > > > >> > LOG_TRUNCATION
> > > > > >> > error from broker C.
> > > > > >> >
> > > > > >> > In response, the client calls KafkaConsumer#findOffsets(
> > offset=3,
> > > > > >> > leaderEpoch=1). The underlying consumer sends
> > > > > >> > OffsetsForLeaderEpoch(leaderEpoch=1), broker C responds with
> > > > > >> > {leaderEpoch=1, endOffset=2}. So,
> > > > KafkaConsumer#findOffsets(offset=3,
> > > > > >> > leaderEpoch=1) returns offset=2.
> > > > > >> >
> > > > > >> > In response, consumer calls KafkaConsumer@seek(offset=2)
> > followed
> > > > by
> > > > > >> > poll(), which results in FetchRequest(offset=2, leaderEpoch=1)
> > to
> > > > > >> broker C.
> > > > > >> >
> > > > > >> >
> > > > > >> > I will continue with this example with the goal to answer your
> > > first
> > > > > >> > question about truncation just after findOffsets() followed by
> > > > seek():
> > > > > >> >
> > > > > >> > Suppose, brokers B and C go down, and broker A comes up and
> > > becomes
> > > > a
> > > > > >> > leader with leader epoch 3, and writes a message to offset 1.
> > > > Suppose,
> > > > > >> this
> > > > > >> > happens before the consumer gets response from broker C to the
> > > > > previous
> > > > > >> > fetch request:  FetchRequest(offset=2, leaderEpoch=1).
> > > > > >> >
> > > > > >> > Consumer re-sends FetchRequest(offset=2, leaderEpoch=1) to
> > broker
> > > A,
> > > > > >> which
> > > > > >> > returns LOG_TRUNCATION error, because broker A has leader
> epoch
> > 3
> > > >
> > > > > >> leader
> > > > > >> > epoch in FetchRequest with starting offset = 1 < offset 2 in
> > > > > >> > FetchRequest().
> > > > > >> >
> > > > > >> > In response, the user calls KafkaConsumer#findOffsets(
> offset=2,
> > > > > >> > leaderEpoch=1). The underlying consumer sends
> > > > > >> > OffsetsForLeaderEpoch(leaderEpoch=1), broker A responds with
> > > > > >> > {leaderEpoch=0, endOffset=1}; the underlying consumer finds
> > > > > leaderEpoch
> > > > > >> = 0
> > > > > >> > in its cache with end offset == 1, which results in
> > > > > >> > KafkaConsumer#findOffsets(offset=2, leaderEpoch=1) returning
> > > offset
> > > > > = 1.
> > > > > >> >
> > > > > >> > In response, the user calls KafkaConsumer@seek(offset=1)
> > followed
> > > > by
> > > > > >> > poll(), which results in FetchRequest(offset=1, leaderEpoch=0)
> > to
> > > > > >> broker A,
> > > > > >> > which responds with message at offset 1, leader epoch 3.
> > > > > >> >
> > > > > >> >
> > > > > >> > I will think some more about consumers restarting from
> committed
> > > > > >> offsets,
> > > > > >> > and send a follow up.
> > > > > >> >
> > > > > >> >
> > > > > >> > Thanks,
> > > > > >> >
> > > > > >> > Anna
> > > > > >> >
> > > > > >> >
> > > > > >> > On Sat, Jul 7, 2018 at 1:36 AM Dong Lin <lindon...@gmail.com>
> > > > wrote:
> > > > > >> >
> > > > > >> > > Hey Anna,
> > > > > >> > >
> > > > > >> > > Thanks much for the thoughtful reply. It makes sense to
> > > different
> > > > > >> between
> > > > > >> > > "seeking to a message" and "seeking to a position". I have
> to
> > > > > >> questions
> > > > > >> > > here:
> > > > > >> > >
> > > > > >> > > - For "seeking to a message" use-case, with the proposed
> > > approach
> > > > > user
> > > > > >> > > needs to call findOffset(offset, leaderEpoch) followed by
> > > > > >> seek(offset).
> > > > > >> > If
> > > > > >> > > message truncation and message append happen immediately
> after
> > > > > >> > > findOffset(offset,
> > > > > >> > > leaderEpoch) but before seek(offset), it seems that user
> will
> > > seek
> > > > > to
> > > > > >> the
> > > > > >> > > wrong message without knowing the truncation has happened.
> > Would
> > > > > this
> > > > > >> be
> > > > > >> > a
> > > > > >> > > problem?
> > > > > >> > >
> > > > > >> > > - For "seeking to a position" use-case, it seems that there
> > can
> > > be
> > > > > two
> > > > > >> > > positions, i.e. earliest and latest. So these two cases can
> be
> > > > > >> > > Consumer.fulfilled by seekToBeginning() and
> > > Consumer.seekToEnd().
> > > > > >> Then it
> > > > > >> > > seems that user will only need to call position() and seek()
> > for
> > > > > >> "seeking
> > > > > >> > > to a message" use-case?
> > > > > >> > >
> > > > > >> > > Thanks,
> > > > > >> > > Dong
> > > > > >> > >
> > > > > >> > >
> > > > > >> > > On Wed, Jul 4, 2018 at 12:33 PM, Anna Povzner <
> > > a...@confluent.io>
> > > > > >> wrote:
> > > > > >> > >
> > > > > >> > > > Hi Jason and Dong,
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > > > I’ve been thinking about your suggestions and discussion
> > > > regarding
> > > > > >> > > > position(), seek(), and new proposed API.
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > > > Here is my thought process why we should keep position()
> and
> > > > > seek()
> > > > > >> API
> > > > > >> > > > unchanged.
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > > > I think we should separate {offset, leader epoch} that
> > > uniquely
> > > > > >> > > identifies
> > > > > >> > > > a message from an offset that is a position. In some
> cases,
> > > > > offsets
> > > > > >> > > > returned from position() could be actual consumed messages
> > by
> > > > this
> > > > > >> > > consumer
> > > > > >> > > > identified by {offset, leader epoch}. In other cases,
> > > position()
> > > > > >> > returns
> > > > > >> > > > offset that was not actually consumed. Suppose, the user
> > calls
> > > > > >> > position()
> > > > > >> > > > for the last offset. Suppose we return {offset, leader
> > epoch}
> > > of
> > > > > the
> > > > > >> > > > message currently in the log. Then, the message gets
> > truncated
> > > > > >> before
> > > > > >> > > > consumer’s first poll(). It does not make sense for poll()
> > to
> > > > fail
> > > > > >> in
> > > > > >> > > this
> > > > > >> > > > case, because the log truncation did not actually happen
> > from
> > > > the
> > > > > >> > > consumer
> > > > > >> > > > perspective. On the other hand, as the KIP proposes, it
> > makes
> > > > > sense
> > > > > >> for
> > > > > >> > > the
> > > > > >> > > > committed() method to return {offset, leader epoch}
> because
> > > > those
> > > > > >> > offsets
> > > > > >> > > > represent actual consumed messages.
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > > > The same argument applies to the seek() method — we are
> not
> > > > > seeking
> > > > > >> to
> > > > > >> > a
> > > > > >> > > > message, we are seeking to a position.
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > > > I like the proposal to add KafkaConsumer#findOffsets()
> API.
> > I
> > > am
> > > > > >> > assuming
> > > > > >> > > > something like:
> > > > > >> > > >
> > > > > >> > > > Map<TopicPartition, Long> findOffsets(Map<TopicPartition,
> > > > > >> > OffsetAndEpoch>
> > > > > >> > > > offsetsToSearch)
> > > > > >> > > >
> > > > > >> > > > Similar to seek() and position(), I think findOffsets()
> > should
> > > > > >> return
> > > > > >> > > > offset without leader epoch, because what we want is the
> > > offset
> > > > > >> that we
> > > > > >> > > > think is closest to the not divergent message from the
> given
> > > > > >> consumed
> > > > > >> > > > message. Until the consumer actually fetches the message,
> we
> > > > > should
> > > > > >> not
> > > > > >> > > let
> > > > > >> > > > the consumer store the leader epoch for a message it did
> not
> > > > > >> consume.
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > > > So, the workflow will be:
> > > > > >> > > >
> > > > > >> > > > 1) The user gets LogTruncationException with {offset,
> leader
> > > > epoch
> > > > > >> of
> > > > > >> > the
> > > > > >> > > > previous message} (whatever we send with new FetchRecords
> > > > > request).
> > > > > >> > > >
> > > > > >> > > > 2) offset = findOffsets(tp -> {offset, leader epoch})
> > > > > >> > > >
> > > > > >> > > > 3) seek(offset)
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > > > For the use-case where the users store committed offsets
> > > > > externally:
> > > > > >> > > >
> > > > > >> > > > 1) Such users would have to track the leader epoch
> together
> > > with
> > > > > an
> > > > > >> > > offset.
> > > > > >> > > > Otherwise, there is no way to detect later what leader
> epoch
> > > was
> > > > > >> > > associated
> > > > > >> > > > with the message. I think it’s reasonable to ask that from
> > > users
> > > > > if
> > > > > >> > they
> > > > > >> > > > want to detect log truncation. Otherwise, they will get
> the
> > > > > current
> > > > > >> > > > behavior.
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > > > If the users currently get an offset to be stored using
> > > > > position(),
> > > > > >> I
> > > > > >> > see
> > > > > >> > > > two possibilities. First, they call save offset returned
> > from
> > > > > >> > position()
> > > > > >> > > > that they call before poll(). In that case, it would not
> be
> > > > > correct
> > > > > >> to
> > > > > >> > > > store {offset, leader epoch} if we would have changed
> > > position()
> > > > > to
> > > > > >> > > return
> > > > > >> > > > {offset, leader epoch} since actual fetched message could
> be
> > > > > >> different
> > > > > >> > > > (from the example I described earlier). So, it would be
> more
> > > > > >> correct to
> > > > > >> > > > call position() after poll(). However, the user already
> gets
> > > > > >> > > > ConsumerRecords at this point, from which the user can
> > extract
> > > > > >> {offset,
> > > > > >> > > > leader epoch} of the last message.
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > > > So, I like the idea of adding a helper method to
> > > > ConsumerRecords,
> > > > > as
> > > > > >> > > Jason
> > > > > >> > > > proposed, something like:
> > > > > >> > > >
> > > > > >> > > > public OffsetAndEpoch lastOffsetWithLeaderEpoch(), where
> > > > > >> OffsetAndEpoch
> > > > > >> > > is
> > > > > >> > > > a data struct holding {offset, leader epoch}.
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > > > In this case, we would advise the user to follow the
> > workflow:
> > > > > >> poll(),
> > > > > >> > > get
> > > > > >> > > > {offset, leader epoch} from ConsumerRecords#lastOffsetWith
> > > > > >> > LeaderEpoch(),
> > > > > >> > > > save offset and leader epoch, process records.
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > > > 2) When the user needs to seek to the last committed
> offset,
> > > > they
> > > > > >> call
> > > > > >> > > new
> > > > > >> > > > findOffsets(saved offset, leader epoch), and then
> > > seek(offset).
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > > > What do you think?
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > > > Thanks,
> > > > > >> > > >
> > > > > >> > > > Anna
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > > > On Tue, Jul 3, 2018 at 4:06 PM Dong Lin <
> > lindon...@gmail.com>
> > > > > >> wrote:
> > > > > >> > > >
> > > > > >> > > > > Hey Jason,
> > > > > >> > > > >
> > > > > >> > > > > Thanks much for your thoughtful explanation.
> > > > > >> > > > >
> > > > > >> > > > > Yes the solution using findOffsets(offset, leaderEpoch)
> > also
> > > > > >> works.
> > > > > >> > The
> > > > > >> > > > > advantage of this solution it adds only one API instead
> of
> > > two
> > > > > >> APIs.
> > > > > >> > > The
> > > > > >> > > > > concern is that its usage seems a bit more clumsy for
> > > advanced
> > > > > >> users.
> > > > > >> > > > More
> > > > > >> > > > > specifically, advanced users who store offsets
> externally
> > > will
> > > > > >> always
> > > > > >> > > > need
> > > > > >> > > > > to call findOffsets() before calling seek(offset) during
> > > > > consumer
> > > > > >> > > > > initialization. And those advanced users will need to
> > > manually
> > > > > >> keep
> > > > > >> > > track
> > > > > >> > > > > of the leaderEpoch of the last ConsumerRecord.
> > > > > >> > > > >
> > > > > >> > > > > The other solution may be more user-friendly for
> advanced
> > > > users
> > > > > >> is to
> > > > > >> > > add
> > > > > >> > > > > two APIs, `void seek(offset, leaderEpoch)` and `(offset,
> > > > epoch)
> > > > > =
> > > > > >> > > > > offsetEpochs(topicPartition)`.
> > > > > >> > > > >
> > > > > >> > > > > I kind of prefer the second solution because it is
> easier
> > to
> > > > use
> > > > > >> for
> > > > > >> > > > > advanced users. If we need to expose leaderEpoch anyway
> to
> > > > > safely
> > > > > >> > > > identify
> > > > > >> > > > > a message, it may be conceptually simpler to expose it
> > > > directly
> > > > > in
> > > > > >> > > > > seek(...) rather than requiring one more translation
> using
> > > > > >> > > > > findOffsets(...). But I am also OK with the first
> solution
> > > if
> > > > > >> other
> > > > > >> > > > > developers also favor that one :)
> > > > > >> > > > >
> > > > > >> > > > > Thanks,
> > > > > >> > > > > Dong
> > > > > >> > > > >
> > > > > >> > > > >
> > > > > >> > > > > On Mon, Jul 2, 2018 at 11:10 AM, Jason Gustafson <
> > > > > >> ja...@confluent.io
> > > > > >> > >
> > > > > >> > > > > wrote:
> > > > > >> > > > >
> > > > > >> > > > > > Hi Dong,
> > > > > >> > > > > >
> > > > > >> > > > > > Thanks, I've been thinking about your suggestions a
> bit.
> > > It
> > > > is
> > > > > >> > > > > challenging
> > > > > >> > > > > > to make this work given the current APIs. One of the
> > > > > >> difficulties
> > > > > >> > is
> > > > > >> > > > that
> > > > > >> > > > > > we don't have an API to find the leader epoch for a
> > given
> > > > > >> offset at
> > > > > >> > > the
> > > > > >> > > > > > moment. So if the user does a seek to offset 5, then
> > we'll
> > > > > need
> > > > > >> a
> > > > > >> > new
> > > > > >> > > > API
> > > > > >> > > > > > to find the corresponding epoch in order to fulfill
> the
> > > new
> > > > > >> > > position()
> > > > > >> > > > > API.
> > > > > >> > > > > > Potentially we could modify ListOffsets to enable
> > finding
> > > > the
> > > > > >> > leader
> > > > > >> > > > > epoch,
> > > > > >> > > > > > but I am not sure it is worthwhile. Perhaps it is
> > > reasonable
> > > > > for
> > > > > >> > > > advanced
> > > > > >> > > > > > usage to expect that the epoch information, if needed,
> > > will
> > > > be
> > > > > >> > > > extracted
> > > > > >> > > > > > from the records directly? It might make sense to
> > expose a
> > > > > >> helper
> > > > > >> > in
> > > > > >> > > > > > `ConsumerRecords` to make this a little easier though.
> > > > > >> > > > > >
> > > > > >> > > > > > Alternatively, if we think it is important to have
> this
> > > > > >> information
> > > > > >> > > > > exposed
> > > > > >> > > > > > directly, we could create batch APIs to solve the
> naming
> > > > > >> problem.
> > > > > >> > For
> > > > > >> > > > > > example:
> > > > > >> > > > > >
> > > > > >> > > > > > Map<TopicPartition, OffsetAndEpoch> positions();
> > > > > >> > > > > > void seek(Map<TopicPartition, OffsetAndEpoch>
> > positions);
> > > > > >> > > > > >
> > > > > >> > > > > > However, I'm actually leaning toward leaving the
> seek()
> > > and
> > > > > >> > > position()
> > > > > >> > > > > APIs
> > > > > >> > > > > > unchanged. Instead, we can add a new API to search for
> > > > offset
> > > > > by
> > > > > >> > > > > timestamp
> > > > > >> > > > > > or by offset/leader epoch. Let's say we call it
> > > > `findOffsets`.
> > > > > >> If
> > > > > >> > the
> > > > > >> > > > > user
> > > > > >> > > > > > hits a log truncation error, they can use this API to
> > find
> > > > the
> > > > > >> > > closest
> > > > > >> > > > > > offset and then do a seek(). At the same time, we
> > > deprecate
> > > > > the
> > > > > >> > > > > > `offsetsForTimes` APIs. We now have two use cases
> which
> > > > > require
> > > > > >> > > finding
> > > > > >> > > > > > offsets, so I think we should make this API general
> and
> > > > leave
> > > > > >> the
> > > > > >> > > door
> > > > > >> > > > > open
> > > > > >> > > > > > for future extensions.
> > > > > >> > > > > >
> > > > > >> > > > > > By the way, I'm unclear about the desire to move part
> of
> > > > this
> > > > > >> > > > > functionality
> > > > > >> > > > > > to AdminClient. Guozhang suggested this previously,
> but
> > I
> > > > > think
> > > > > >> it
> > > > > >> > > only
> > > > > >> > > > > > makes sense for cross-cutting capabilities such as
> topic
> > > > > >> creation.
> > > > > >> > If
> > > > > >> > > > we
> > > > > >> > > > > > have an API which is primarily useful by consumers,
> > then I
> > > > > think
> > > > > >> > > that's
> > > > > >> > > > > > where it should be exposed. The AdminClient also has
> its
> > > own
> > > > > API
> > > > > >> > > > > integrity
> > > > > >> > > > > > and should not become a dumping ground for advanced
> use
> > > > cases.
> > > > > >> I'll
> > > > > >> > > > > update
> > > > > >> > > > > > the KIP with the  `findOffsets` API suggested above
> and
> > we
> > > > can
> > > > > >> see
> > > > > >> > if
> > > > > >> > > > it
> > > > > >> > > > > > does a good enough job of keeping the API simple for
> > > common
> > > > > >> cases.
> > > > > >> > > > > >
> > > > > >> > > > > > Thanks,
> > > > > >> > > > > > Jason
> > > > > >> > > > > >
> > > > > >> > > > > >
> > > > > >> > > > > > On Sat, Jun 30, 2018 at 4:39 AM, Dong Lin <
> > > > > lindon...@gmail.com>
> > > > > >> > > wrote:
> > > > > >> > > > > >
> > > > > >> > > > > > > Hey Jason,
> > > > > >> > > > > > >
> > > > > >> > > > > > > Regarding seek(...), it seems that we want an API
> for
> > > user
> > > > > to
> > > > > >> > > > > initialize
> > > > > >> > > > > > > consumer with (offset, leaderEpoch) and that API
> > should
> > > > > allow
> > > > > >> > > > throwing
> > > > > >> > > > > > > PartitionTruncationException. Suppose we agree on
> > this,
> > > > then
> > > > > >> > > > > > > seekToNearest() is not sufficient because it will
> > always
> > > > > >> swallow
> > > > > >> > > > > > > PartitionTruncationException. Here we have two
> > options.
> > > > The
> > > > > >> first
> > > > > >> > > > > option
> > > > > >> > > > > > is
> > > > > >> > > > > > > to add API offsetsForLeaderEpochs() to translate
> > > > > (leaderEpoch,
> > > > > >> > > > offset)
> > > > > >> > > > > to
> > > > > >> > > > > > > offset. The second option is to have add
> seek(offset,
> > > > > >> > leaderEpoch).
> > > > > >> > > > It
> > > > > >> > > > > > > seems that second option may be more simpler because
> > it
> > > > > makes
> > > > > >> it
> > > > > >> > > > clear
> > > > > >> > > > > > that
> > > > > >> > > > > > > (offset, leaderEpoch) will be used to identify
> > > consumer's
> > > > > >> > position
> > > > > >> > > > in a
> > > > > >> > > > > > > partition. And user only needs to handle
> > > > > >> > > PartitionTruncationException
> > > > > >> > > > > > from
> > > > > >> > > > > > > the poll(). In comparison the first option seems a
> bit
> > > > > harder
> > > > > >> to
> > > > > >> > > use
> > > > > >> > > > > > > because user have to also handle the
> > > > > >> PartitionTruncationException
> > > > > >> > > if
> > > > > >> > > > > > > offsetsForLeaderEpochs() returns different offset
> from
> > > > > >> > > user-provided
> > > > > >> > > > > > > offset. What do you think?
> > > > > >> > > > > > >
> > > > > >> > > > > > > If we decide to add API seek(offset, leaderEpoch),
> > then
> > > we
> > > > > can
> > > > > >> > > decide
> > > > > >> > > > > > > whether and how to add API to translate (offset,
> > > > > leaderEpoch)
> > > > > >> to
> > > > > >> > > > > offset.
> > > > > >> > > > > > It
> > > > > >> > > > > > > seems that this API will be needed by advanced user
> to
> > > > don't
> > > > > >> want
> > > > > >> > > > auto
> > > > > >> > > > > > > offset reset (so that it can be notified) but still
> > > wants
> > > > to
> > > > > >> > reset
> > > > > >> > > > > offset
> > > > > >> > > > > > > to closest. For those users if probably makes sense
> to
> > > > only
> > > > > >> have
> > > > > >> > > the
> > > > > >> > > > > API
> > > > > >> > > > > > in
> > > > > >> > > > > > > AdminClient. offsetsForTimes() seems like a common
> API
> > > > that
> > > > > >> will
> > > > > >> > be
> > > > > >> > > > > > needed
> > > > > >> > > > > > > by user's of consumer in general, so it may be more
> > > > > >> reasonable to
> > > > > >> > > > stay
> > > > > >> > > > > in
> > > > > >> > > > > > > the consumer API. I don't have a strong opinion on
> > > whether
> > > > > >> > > > > > > offsetsForTimes() should be replaced by API in
> > > > AdminClient.
> > > > > >> > > > > > >
> > > > > >> > > > > > > Though (offset, leaderEpoch) is needed to uniquely
> > > > identify
> > > > > a
> > > > > >> > > message
> > > > > >> > > > > in
> > > > > >> > > > > > > general, it is only needed for advanced users who
> has
> > > > turned
> > > > > >> on
> > > > > >> > > > unclean
> > > > > >> > > > > > > leader election, need to use seek(..), and don't
> want
> > > auto
> > > > > >> offset
> > > > > >> > > > > reset.
> > > > > >> > > > > > > Most other users probably just want to enable auto
> > > offset
> > > > > >> reset
> > > > > >> > and
> > > > > >> > > > > store
> > > > > >> > > > > > > offset in Kafka. Thus we might want to keep the
> > existing
> > > > > >> > > offset-only
> > > > > >> > > > > APIs
> > > > > >> > > > > > > (e.g. seek() and position()) for most users while
> > adding
> > > > new
> > > > > >> APIs
> > > > > >> > > for
> > > > > >> > > > > > > advanced users. And yes, it seems that we need new
> > name
> > > > for
> > > > > >> > > > position().
> > > > > >> > > > > > >
> > > > > >> > > > > > > Though I think we need new APIs to carry the new
> > > > information
> > > > > >> > (e.g.
> > > > > >> > > > > > > leaderEpoch), I am not very sure how that should
> look
> > > > like.
> > > > > >> One
> > > > > >> > > > > possible
> > > > > >> > > > > > > option is those APIs in KIP-232. Another option is
> > > > something
> > > > > >> like
> > > > > >> > > > this:
> > > > > >> > > > > > >
> > > > > >> > > > > > > `````
> > > > > >> > > > > > > class OffsetEpochs {
> > > > > >> > > > > > >   long offset;
> > > > > >> > > > > > >   int leaderEpoch;
> > > > > >> > > > > > >   int partitionEpoch;   // This may be needed later
> as
> > > > > >> discussed
> > > > > >> > in
> > > > > >> > > > > > KIP-232
> > > > > >> > > > > > >   ... // Hopefully these are all we need to identify
> > > > message
> > > > > >> in
> > > > > >> > > > Kafka.
> > > > > >> > > > > > But
> > > > > >> > > > > > > if we need more then we can add new fields in this
> > > class.
> > > > > >> > > > > > > }
> > > > > >> > > > > > >
> > > > > >> > > > > > > OffsetEpochs offsetEpochs(TopicPartition);
> > > > > >> > > > > > >
> > > > > >> > > > > > > void seek(TopicPartition, OffsetEpochs);
> > > > > >> > > > > > > ``````
> > > > > >> > > > > > >
> > > > > >> > > > > > >
> > > > > >> > > > > > > Thanks,
> > > > > >> > > > > > > Dong
> > > > > >> > > > > > >
> > > > > >> > > > > > >
> > > > > >> > > > > > > On Fri, Jun 29, 2018 at 11:13 AM, Jason Gustafson <
> > > > > >> > > > ja...@confluent.io>
> > > > > >> > > > > > > wrote:
> > > > > >> > > > > > >
> > > > > >> > > > > > > > Hey Dong,
> > > > > >> > > > > > > >
> > > > > >> > > > > > > > Thanks for the feedback. The first three points
> are
> > > > easy:
> > > > > >> > > > > > > >
> > > > > >> > > > > > > > 1. Yes, we should be consistent.
> > > > > >> > > > > > > > 2. Yes, I will add this.
> > > > > >> > > > > > > > 3. Yes, I think we should document the changes to
> > the
> > > > > >> committed
> > > > > >> > > > > offset
> > > > > >> > > > > > > > schema. I meant to do this, but it slipped my
> mind.
> > > > > >> > > > > > > >
> > > > > >> > > > > > > > The latter questions are tougher. One option I was
> > > > > >> considering
> > > > > >> > is
> > > > > >> > > > to
> > > > > >> > > > > > have
> > > > > >> > > > > > > > only `offsetsForLeaderEpochs` exposed from the
> > > consumer
> > > > > and
> > > > > >> to
> > > > > >> > > drop
> > > > > >> > > > > the
> > > > > >> > > > > > > new
> > > > > >> > > > > > > > seek() API. That seems more consistent with the
> > > current
> > > > > use
> > > > > >> of
> > > > > >> > > > > > > > `offsetsForTimes` (we don't have a separate
> > > > > >> `seekToTimestamp`
> > > > > >> > > API).
> > > > > >> > > > > An
> > > > > >> > > > > > > > alternative might be to take a page from the
> > > AdminClient
> > > > > API
> > > > > >> > and
> > > > > >> > > > add
> > > > > >> > > > > a
> > > > > >> > > > > > > new
> > > > > >> > > > > > > > method to generalize offset lookup. For example,
> we
> > > > could
> > > > > >> have
> > > > > >> > > > > > > > `lookupOffsets(LookupOptions)`. We could then
> > > deprecate
> > > > > >> > > > > > > `offsetsForTimes`
> > > > > >> > > > > > > > and this would open the door for future extensions
> > > > without
> > > > > >> > > needing
> > > > > >> > > > > new
> > > > > >> > > > > > > > APIs.
> > > > > >> > > > > > > >
> > > > > >> > > > > > > > The case of position() is a little more annoying.
> It
> > > > would
> > > > > >> have
> > > > > >> > > > been
> > > > > >> > > > > > > better
> > > > > >> > > > > > > > had we let this return an object so that it is
> > easier
> > > to
> > > > > >> > extend.
> > > > > >> > > > This
> > > > > >> > > > > > is
> > > > > >> > > > > > > > the only reason I didn't add the API to the KIP.
> > Maybe
> > > > we
> > > > > >> > should
> > > > > >> > > > bite
> > > > > >> > > > > > the
> > > > > >> > > > > > > > bullet and fix this now? Unfortunately we'll have
> to
> > > > come
> > > > > up
> > > > > >> > > with a
> > > > > >> > > > > new
> > > > > >> > > > > > > > name. Maybe `currentPosition`?
> > > > > >> > > > > > > >
> > > > > >> > > > > > > > Thoughts?
> > > > > >> > > > > > > >
> > > > > >> > > > > > > > -Jason
> > > > > >> > > > > > > >
> > > > > >> > > > > > > >
> > > > > >> > > > > > > > On Fri, Jun 29, 2018 at 10:40 AM, Dong Lin <
> > > > > >> > lindon...@gmail.com>
> > > > > >> > > > > > wrote:
> > > > > >> > > > > > > >
> > > > > >> > > > > > > > > Regarding points 4) and 5) above, motivation for
> > the
> > > > > >> > > alternative
> > > > > >> > > > > APIs
> > > > > >> > > > > > > is
> > > > > >> > > > > > > > > that, if we decide that leaderEpoch is equally
> > > > important
> > > > > >> as
> > > > > >> > > > offset
> > > > > >> > > > > in
> > > > > >> > > > > > > > > identifying a message, then it may be reasonable
> > to
> > > > > always
> > > > > >> > > > specify
> > > > > >> > > > > it
> > > > > >> > > > > > > > > wherever offset is currently required in the
> > > consumer
> > > > > API
> > > > > >> to
> > > > > >> > > > > > identify a
> > > > > >> > > > > > > > > message, e.g. position(), seek(). For example,
> > since
> > > > we
> > > > > >> allow
> > > > > >> > > > user
> > > > > >> > > > > to
> > > > > >> > > > > > > > > retrieve offset using position() instead of
> asking
> > > > user
> > > > > to
> > > > > >> > keep
> > > > > >> > > > > track
> > > > > >> > > > > > > of
> > > > > >> > > > > > > > > the offset of the latest ConsumerRecord, may be
> it
> > > > will
> > > > > be
> > > > > >> > more
> > > > > >> > > > > > > > consistent
> > > > > >> > > > > > > > > for user to also retrieve  leaderEpoch using
> > > > position()?
> > > > > >> > > > > > > > >
> > > > > >> > > > > > > > >
> > > > > >> > > > > > > > >
> > > > > >> > > > > > > > > On Fri, Jun 29, 2018 at 10:30 AM, Dong Lin <
> > > > > >> > > lindon...@gmail.com>
> > > > > >> > > > > > > wrote:
> > > > > >> > > > > > > > >
> > > > > >> > > > > > > > > > Hey Jason,
> > > > > >> > > > > > > > > >
> > > > > >> > > > > > > > > > Thanks for the update. It looks pretty good.
> > Just
> > > > some
> > > > > >> > minor
> > > > > >> > > > > > comments
> > > > > >> > > > > > > > > > below:
> > > > > >> > > > > > > > > >
> > > > > >> > > > > > > > > > 1) The KIP adds new error code
> "LOG_TRUNCATION"
> > > and
> > > > > new
> > > > > >> > > > exception
> > > > > >> > > > > > > > > TruncatedPartitionException.
> > > > > >> > > > > > > > > > Can we make the name more consistent, e.g.
> > > > > >> > > > > LogTruncationException?
> > > > > >> > > > > > > > > >
> > > > > >> > > > > > > > > > 2) Do we need to add
> UnknownLeaderEpochException
> > > as
> > > > > >> part of
> > > > > >> > > API
> > > > > >> > > > > > > change?
> > > > > >> > > > > > > > > >
> > > > > >> > > > > > > > > > 3) Not sure if the offset topic schema is also
> > > > public
> > > > > >> API.
> > > > > >> > If
> > > > > >> > > > so,
> > > > > >> > > > > > > maybe
> > > > > >> > > > > > > > > we
> > > > > >> > > > > > > > > > should also include the schema change in the
> > API?
> > > > > >> > > > > > > > > >
> > > > > >> > > > > > > > > > 4) For users who store offset externally,
> > > currently
> > > > > they
> > > > > >> > get
> > > > > >> > > > > offset
> > > > > >> > > > > > > > using
> > > > > >> > > > > > > > > > position(..), store the offset externally, and
> > use
> > > > > >> seek(..)
> > > > > >> > > to
> > > > > >> > > > > > > > initialize
> > > > > >> > > > > > > > > > the consumer next time. After this KIP they
> will
> > > > need
> > > > > to
> > > > > >> > > store
> > > > > >> > > > > and
> > > > > >> > > > > > > use
> > > > > >> > > > > > > > > the
> > > > > >> > > > > > > > > > leaderEpoch together with the offset. Should
> we
> > > also
> > > > > >> update
> > > > > >> > > the
> > > > > >> > > > > API
> > > > > >> > > > > > > so
> > > > > >> > > > > > > > > that
> > > > > >> > > > > > > > > > user can also get leaderEpoch from
> > position(...)?
> > > > Not
> > > > > >> sure
> > > > > >> > if
> > > > > >> > > > it
> > > > > >> > > > > is
> > > > > >> > > > > > > OK
> > > > > >> > > > > > > > to
> > > > > >> > > > > > > > > > ask user to track the latest leaderEpoch of
> > > > > >> ConsumerRecord
> > > > > >> > by
> > > > > >> > > > > > > > themselves.
> > > > > >> > > > > > > > > >
> > > > > >> > > > > > > > > > 5) Also for users who store offset externally,
> > > they
> > > > > >> need to
> > > > > >> > > > call
> > > > > >> > > > > > > > seek(..)
> > > > > >> > > > > > > > > > with leaderEpoch to initialize consumer. With
> > > > current
> > > > > >> KIP
> > > > > >> > > users
> > > > > >> > > > > > need
> > > > > >> > > > > > > to
> > > > > >> > > > > > > > > > call seekToNearest(), whose name suggests that
> > the
> > > > > final
> > > > > >> > > > position
> > > > > >> > > > > > may
> > > > > >> > > > > > > > be
> > > > > >> > > > > > > > > > different from what was requested. However, if
> > > users
> > > > > may
> > > > > >> > want
> > > > > >> > > > to
> > > > > >> > > > > > > avoid
> > > > > >> > > > > > > > > auto
> > > > > >> > > > > > > > > > offset reset and be notified explicitly when
> > there
> > > > is
> > > > > >> log
> > > > > >> > > > > > truncation,
> > > > > >> > > > > > > > > then seekToNearest()
> > > > > >> > > > > > > > > > probably does not help here. Would it make
> sense
> > > to
> > > > > >> replace
> > > > > >> > > > > > > > > seekToNearest()
> > > > > >> > > > > > > > > > with seek(offset, leaderEpoch) + AminClient.
> > > > > >> > > > > > > > offsetsForLeaderEpochs(...)?
> > > > > >> > > > > > > > > >
> > > > > >> > > > > > > > > >
> > > > > >> > > > > > > > > > Thanks,
> > > > > >> > > > > > > > > > Dong
> > > > > >> > > > > > > > > >
> > > > > >> > > > > > > > > >
> > > > > >> > > > > > > > > > On Wed, Jun 27, 2018 at 3:57 PM, Jason
> > Gustafson <
> > > > > >> > > > > > ja...@confluent.io
> > > > > >> > > > > > > >
> > > > > >> > > > > > > > > > wrote:
> > > > > >> > > > > > > > > >
> > > > > >> > > > > > > > > >> Hey Guozhang,
> > > > > >> > > > > > > > > >>
> > > > > >> > > > > > > > > >> That's fair. In fact, perhaps we do not need
> > this
> > > > API
> > > > > >> at
> > > > > >> > > all.
> > > > > >> > > > We
> > > > > >> > > > > > > > already
> > > > > >> > > > > > > > > >> have the new seek() in this KIP which can do
> > the
> > > > > lookup
> > > > > >> > > based
> > > > > >> > > > on
> > > > > >> > > > > > > epoch
> > > > > >> > > > > > > > > for
> > > > > >> > > > > > > > > >> this use case. I guess we should probably
> call
> > it
> > > > > >> > > > > seekToNearest()
> > > > > >> > > > > > > > though
> > > > > >> > > > > > > > > >> to
> > > > > >> > > > > > > > > >> make it clear that the final position may be
> > > > > different
> > > > > >> > from
> > > > > >> > > > what
> > > > > >> > > > > > was
> > > > > >> > > > > > > > > >> requested.
> > > > > >> > > > > > > > > >>
> > > > > >> > > > > > > > > >> Thanks,
> > > > > >> > > > > > > > > >> Jason
> > > > > >> > > > > > > > > >>
> > > > > >> > > > > > > > > >> On Wed, Jun 27, 2018 at 3:20 PM, Guozhang
> Wang
> > <
> > > > > >> > > > > > wangg...@gmail.com>
> > > > > >> > > > > > > > > >> wrote:
> > > > > >> > > > > > > > > >>
> > > > > >> > > > > > > > > >> > Hi Jason,
> > > > > >> > > > > > > > > >> >
> > > > > >> > > > > > > > > >> > I think it is less worthwhile to add
> > > > > >> > > > > > > KafkaConsumer#offsetsForLeader
> > > > > >> > > > > > > > > >> Epochs,
> > > > > >> > > > > > > > > >> > since probably only very advanced users are
> > > aware
> > > > > of
> > > > > >> the
> > > > > >> > > > > > > > leaderEpoch,
> > > > > >> > > > > > > > > >> and
> > > > > >> > > > > > > > > >> > hence ever care to use it anyways. It is
> more
> > > > like
> > > > > an
> > > > > >> > > admin
> > > > > >> > > > > > client
> > > > > >> > > > > > > > > >> > operation than a consumer client operation:
> > if
> > > > the
> > > > > >> > > > motivation
> > > > > >> > > > > is
> > > > > >> > > > > > > to
> > > > > >> > > > > > > > > >> > facility customized reset policy, maybe
> > adding
> > > it
> > > > > as
> > > > > >> > > > > > > > > >> > AdminClient#offsetsForLeaderEpochs
> > > > > >> > > > > > > > > >> > is better as it is not an aggressive
> > assumption
> > > > > that
> > > > > >> for
> > > > > >> > > > such
> > > > > >> > > > > > > > advanced
> > > > > >> > > > > > > > > >> > users they are willing to use some admin
> > client
> > > > to
> > > > > >> get
> > > > > >> > > > further
> > > > > >> > > > > > > > > >> information?
> > > > > >> > > > > > > > > >> >
> > > > > >> > > > > > > > > >> >
> > > > > >> > > > > > > > > >> > Guozhang
> > > > > >> > > > > > > > > >> >
> > > > > >> > > > > > > > > >> >
> > > > > >> > > > > > > > > >> > On Wed, Jun 27, 2018 at 2:07 PM, Jason
> > > Gustafson
> > > > <
> > > > > >> > > > > > > > ja...@confluent.io>
> > > > > >> > > > > > > > > >> > wrote:
> > > > > >> > > > > > > > > >> >
> > > > > >> > > > > > > > > >> > > Thanks for the feedback. I've updated the
> > > KIP.
> > > > > >> > > > Specifically
> > > > > >> > > > > I
> > > > > >> > > > > > > > > removed
> > > > > >> > > > > > > > > >> the
> > > > > >> > > > > > > > > >> > > "closest" reset option and the proposal
> to
> > > > reset
> > > > > by
> > > > > >> > > > > timestamp
> > > > > >> > > > > > > when
> > > > > >> > > > > > > > > the
> > > > > >> > > > > > > > > >> > > precise truncation point cannot be
> > > determined.
> > > > > >> > Instead,
> > > > > >> > > I
> > > > > >> > > > > > > proposed
> > > > > >> > > > > > > > > >> that
> > > > > >> > > > > > > > > >> > we
> > > > > >> > > > > > > > > >> > > always reset using the nearest epoch
> when a
> > > > reset
> > > > > >> > policy
> > > > > >> > > > is
> > > > > >> > > > > > > > defined
> > > > > >> > > > > > > > > >> > (either
> > > > > >> > > > > > > > > >> > > "earliest" or "latest"). Does that sound
> > > > > >> reasonable?
> > > > > >> > > > > > > > > >> > >
> > > > > >> > > > > > > > > >> > > One thing I am still debating is whether
> it
> > > > would
> > > > > >> be
> > > > > >> > > > better
> > > > > >> > > > > to
> > > > > >> > > > > > > > have
> > > > > >> > > > > > > > > a
> > > > > >> > > > > > > > > >> > > separate API to find the closest offset
> > using
> > > > the
> > > > > >> > leader
> > > > > >> > > > > > epoch.
> > > > > >> > > > > > > In
> > > > > >> > > > > > > > > the
> > > > > >> > > > > > > > > >> > > current KIP, I suggested to piggyback
> this
> > > > > >> information
> > > > > >> > > on
> > > > > >> > > > an
> > > > > >> > > > > > > > > >> exception,
> > > > > >> > > > > > > > > >> > but
> > > > > >> > > > > > > > > >> > > I'm beginning to think it would be better
> > not
> > > > to
> > > > > >> hide
> > > > > >> > > the
> > > > > >> > > > > > > lookup.
> > > > > >> > > > > > > > It
> > > > > >> > > > > > > > > >> is
> > > > > >> > > > > > > > > >> > > awkward to implement since it means
> > delaying
> > > > the
> > > > > >> > > exception
> > > > > >> > > > > and
> > > > > >> > > > > > > the
> > > > > >> > > > > > > > > API
> > > > > >> > > > > > > > > >> > may
> > > > > >> > > > > > > > > >> > > actually be useful when customizing reset
> > > logic
> > > > > if
> > > > > >> no
> > > > > >> > > auto
> > > > > >> > > > > > reset
> > > > > >> > > > > > > > > >> policy
> > > > > >> > > > > > > > > >> > is
> > > > > >> > > > > > > > > >> > > defined. I was thinking we can add an API
> > > like
> > > > > the
> > > > > >> > > > > following:
> > > > > >> > > > > > > > > >> > >
> > > > > >> > > > > > > > > >> > > Map<TopicPartition, OffsetAndEpoch>
> > > > > >> > > > > > > > > >> > > offsetsForLeaderEpochs(Map<
> TopicPartition,
> > > > > Integer>
> > > > > >> > > > > > > > epochsToSearch)
> > > > > >> > > > > > > > > >> > >
> > > > > >> > > > > > > > > >> > > Thoughts?
> > > > > >> > > > > > > > > >> > >
> > > > > >> > > > > > > > > >> > > -Jason
> > > > > >> > > > > > > > > >> > >
> > > > > >> > > > > > > > > >> > >
> > > > > >> > > > > > > > > >> > >
> > > > > >> > > > > > > > > >> > >
> > > > > >> > > > > > > > > >> > > On Wed, Jun 27, 2018 at 11:12 AM, Jason
> > > > > Gustafson <
> > > > > >> > > > > > > > > ja...@confluent.io
> > > > > >> > > > > > > > > >> >
> > > > > >> > > > > > > > > >> > > wrote:
> > > > > >> > > > > > > > > >> > >
> > > > > >> > > > > > > > > >> > > > @Dong
> > > > > >> > > > > > > > > >> > > >
> > > > > >> > > > > > > > > >> > > > Those are fair points. Both approaches
> > > > require
> > > > > >> some
> > > > > >> > > > > > fuzziness
> > > > > >> > > > > > > to
> > > > > >> > > > > > > > > >> reset
> > > > > >> > > > > > > > > >> > > the
> > > > > >> > > > > > > > > >> > > > offset in these pathological scenarios
> > and
> > > we
> > > > > >> cannot
> > > > > >> > > > > > guarantee
> > > > > >> > > > > > > > > >> > > > at-least-once delivery either way
> unless
> > we
> > > > > have
> > > > > >> the
> > > > > >> > > > full
> > > > > >> > > > > > > > history
> > > > > >> > > > > > > > > of
> > > > > >> > > > > > > > > >> > > leader
> > > > > >> > > > > > > > > >> > > > epochs that were consumed. The KIP-101
> > > logic
> > > > > may
> > > > > >> > > > actually
> > > > > >> > > > > be
> > > > > >> > > > > > > > more
> > > > > >> > > > > > > > > >> > > accurate
> > > > > >> > > > > > > > > >> > > > than using timestamps because it does
> not
> > > > > depend
> > > > > >> on
> > > > > >> > > the
> > > > > >> > > > > > > messages
> > > > > >> > > > > > > > > >> which
> > > > > >> > > > > > > > > >> > > are
> > > > > >> > > > > > > > > >> > > > written after the unclean leader
> > election.
> > > > The
> > > > > >> case
> > > > > >> > > > we're
> > > > > >> > > > > > > > talking
> > > > > >> > > > > > > > > >> about
> > > > > >> > > > > > > > > >> > > > should be extremely rare in practice
> > > anyway.
> > > > I
> > > > > >> also
> > > > > >> > > > agree
> > > > > >> > > > > > that
> > > > > >> > > > > > > > we
> > > > > >> > > > > > > > > >> may
> > > > > >> > > > > > > > > >> > not
> > > > > >> > > > > > > > > >> > > > want to add new machinery if it only
> > helps
> > > > the
> > > > > >> old
> > > > > >> > > > message
> > > > > >> > > > > > > > format.
> > > > > >> > > > > > > > > >> Ok,
> > > > > >> > > > > > > > > >> > > > let's go ahead and drop the timestamp.
> > > > > >> > > > > > > > > >> > > >
> > > > > >> > > > > > > > > >> > > > @Guozhang
> > > > > >> > > > > > > > > >> > > >
> > > > > >> > > > > > > > > >> > > > * My current understanding is that,
> with
> > > > > unclean
> > > > > >> > > leader
> > > > > >> > > > > > > election
> > > > > >> > > > > > > > > >> turned
> > > > > >> > > > > > > > > >> > > on,
> > > > > >> > > > > > > > > >> > > >> exactly-once is out of the window
> since
> > we
> > > > > >> cannot
> > > > > >> > > > > guarantee
> > > > > >> > > > > > > > that
> > > > > >> > > > > > > > > >> all
> > > > > >> > > > > > > > > >> > > >> committed message markers will not be
> > > lost.
> > > > > And
> > > > > >> > hence
> > > > > >> > > > > there
> > > > > >> > > > > > > is
> > > > > >> > > > > > > > no
> > > > > >> > > > > > > > > >> need
> > > > > >> > > > > > > > > >> > > to
> > > > > >> > > > > > > > > >> > > >> have special handling logic for
> > > > LOG_TRUNCATED
> > > > > or
> > > > > >> > OOR
> > > > > >> > > > > error
> > > > > >> > > > > > > > codes
> > > > > >> > > > > > > > > >> with
> > > > > >> > > > > > > > > >> > > >> read.committed turned on. Is that
> right?
> > > > > >> > > > > > > > > >> > > >
> > > > > >> > > > > > > > > >> > > >
> > > > > >> > > > > > > > > >> > > > Yes, that's right. EoS and unclean
> leader
> > > > > >> election
> > > > > >> > > don't
> > > > > >> > > > > mix
> > > > > >> > > > > > > > well.
> > > > > >> > > > > > > > > >> It
> > > > > >> > > > > > > > > >> > may
> > > > > >> > > > > > > > > >> > > > be worth considering separately whether
> > we
> > > > > should
> > > > > >> > try
> > > > > >> > > to
> > > > > >> > > > > > > > reconcile
> > > > > >> > > > > > > > > >> the
> > > > > >> > > > > > > > > >> > > > transaction log following an unclean
> > leader
> > > > > >> > election.
> > > > > >> > > At
> > > > > >> > > > > > least
> > > > > >> > > > > > > > we
> > > > > >> > > > > > > > > >> may
> > > > > >> > > > > > > > > >> > be
> > > > > >> > > > > > > > > >> > > > able to prevent dangling transactions
> > from
> > > > > >> blocking
> > > > > >> > > > > > consumers.
> > > > > >> > > > > > > > > This
> > > > > >> > > > > > > > > >> KIP
> > > > > >> > > > > > > > > >> > > > does not address this problem.
> > > > > >> > > > > > > > > >> > > >
> > > > > >> > > > > > > > > >> > > > * MINOR: "if the epoch is greater than
> > the
> > > > > >> minimum
> > > > > >> > > > > expected
> > > > > >> > > > > > > > epoch,
> > > > > >> > > > > > > > > >> that
> > > > > >> > > > > > > > > >> > > the
> > > > > >> > > > > > > > > >> > > >> new epoch does not begin at an earlier
> > > > offset
> > > > > >> than
> > > > > >> > > the
> > > > > >> > > > > > fetch
> > > > > >> > > > > > > > > >> offset.
> > > > > >> > > > > > > > > >> > In
> > > > > >> > > > > > > > > >> > > >> the latter case, the leader can
> respond
> > > > with a
> > > > > >> new
> > > > > >> > > > > > > > LOG_TRUNCATION
> > > > > >> > > > > > > > > >> > error
> > > > > >> > > > > > > > > >> > > >> code" should it be "does not begin at
> a
> > > > later
> > > > > >> > offset
> > > > > >> > > > than
> > > > > >> > > > > > the
> > > > > >> > > > > > > > > fetch
> > > > > >> > > > > > > > > >> > > >> offset"?
> > > > > >> > > > > > > > > >> > > >
> > > > > >> > > > > > > > > >> > > >
> > > > > >> > > > > > > > > >> > > > I think the comment is correct, though
> > the
> > > > > >> phrasing
> > > > > >> > > may
> > > > > >> > > > be
> > > > > >> > > > > > > > > >> confusing.
> > > > > >> > > > > > > > > >> > We
> > > > > >> > > > > > > > > >> > > > know truncation has occurred if there
> > > exists
> > > > a
> > > > > >> > larger
> > > > > >> > > > > epoch
> > > > > >> > > > > > > > with a
> > > > > >> > > > > > > > > >> > > starting
> > > > > >> > > > > > > > > >> > > > offset that is lower than the fetch
> > offset.
> > > > Let
> > > > > >> me
> > > > > >> > try
> > > > > >> > > > to
> > > > > >> > > > > > > > rephrase
> > > > > >> > > > > > > > > >> > this.
> > > > > >> > > > > > > > > >> > > >
> > > > > >> > > > > > > > > >> > > > Thanks,
> > > > > >> > > > > > > > > >> > > > Jason
> > > > > >> > > > > > > > > >> > > >
> > > > > >> > > > > > > > > >> > > > On Wed, Jun 27, 2018 at 9:17 AM,
> Guozhang
> > > > Wang
> > > > > <
> > > > > >> > > > > > > > > wangg...@gmail.com>
> > > > > >> > > > > > > > > >> > > wrote:
> > > > > >> > > > > > > > > >> > > >
> > > > > >> > > > > > > > > >> > > >> Jason, thanks for the KIP. A few
> > comments:
> > > > > >> > > > > > > > > >> > > >>
> > > > > >> > > > > > > > > >> > > >> * I think Dong's question about
> whether
> > to
> > > > use
> > > > > >> > > > > > > timestamp-based
> > > > > >> > > > > > > > > >> > approach
> > > > > >> > > > > > > > > >> > > >> v.s. start-offset-of-first-larger-
> epoch
> > > is
> > > > > >> valid;
> > > > > >> > > more
> > > > > >> > > > > > > > > >> specifically,
> > > > > >> > > > > > > > > >> > > with
> > > > > >> > > > > > > > > >> > > >> timestamp-based approach we may still
> be
> > > > > >> reseting
> > > > > >> > to
> > > > > >> > > an
> > > > > >> > > > > > > offset
> > > > > >> > > > > > > > > >> falling
> > > > > >> > > > > > > > > >> > > >> into
> > > > > >> > > > > > > > > >> > > >> the truncated interval, and hence we
> may
> > > > still
> > > > > >> miss
> > > > > >> > > > some
> > > > > >> > > > > > > data,
> > > > > >> > > > > > > > > i.e.
> > > > > >> > > > > > > > > >> > not
> > > > > >> > > > > > > > > >> > > >> guaranteeing at-least-once still. With
> > the
> > > > > >> > > > > > > > > >> > > >> start-offset-of-first-larger-epoch,
> I'm
> > > not
> > > > > sure
> > > > > >> > if
> > > > > >> > > it
> > > > > >> > > > > > will
> > > > > >> > > > > > > > > >> guarantee
> > > > > >> > > > > > > > > >> > > no
> > > > > >> > > > > > > > > >> > > >> valid data is missed when we have
> > > > consecutive
> > > > > >> log
> > > > > >> > > > > > truncations
> > > > > >> > > > > > > > > >> (maybe
> > > > > >> > > > > > > > > >> > we
> > > > > >> > > > > > > > > >> > > >> need to look back into details of
> > KIP-101
> > > to
> > > > > >> figure
> > > > > >> > > it
> > > > > >> > > > > > out).
> > > > > >> > > > > > > If
> > > > > >> > > > > > > > > the
> > > > > >> > > > > > > > > >> > > latter
> > > > > >> > > > > > > > > >> > > >> can indeed guarantee at least once, we
> > > could
> > > > > >> > consider
> > > > > >> > > > > using
> > > > > >> > > > > > > > that
> > > > > >> > > > > > > > > >> > > approach.
> > > > > >> > > > > > > > > >> > > >>
> > > > > >> > > > > > > > > >> > > >> * My current understanding is that,
> with
> > > > > unclean
> > > > > >> > > leader
> > > > > >> > > > > > > > election
> > > > > >> > > > > > > > > >> > turned
> > > > > >> > > > > > > > > >> > > >> on,
> > > > > >> > > > > > > > > >> > > >> exactly-once is out of the window
> since
> > we
> > > > > >> cannot
> > > > > >> > > > > guarantee
> > > > > >> > > > > > > > that
> > > > > >> > > > > > > > > >> all
> > > > > >> > > > > > > > > >> > > >> committed message markers will not be
> > > lost.
> > > > > And
> > > > > >> > hence
> > > > > >> > > > > there
> > > > > >> > > > > > > is
> > > > > >> > > > > > > > no
> > > > > >> > > > > > > > > >> need
> > > > > >> > > > > > > > > >> > > to
> > > > > >> > > > > > > > > >> > > >> have special handling logic for
> > > > LOG_TRUNCATED
> > > > > or
> > > > > >> > OOR
> > > > > >> > > > > error
> > > > > >> > > > > > > > codes
> > > > > >> > > > > > > > > >> with
> > > > > >> > > > > > > > > >> > > >> read.committed turned on. Is that
> right?
> > > > > >> > > > > > > > > >> > > >>
> > > > > >> > > > > > > > > >> > > >> * MINOR: "if the epoch is greater than
> > the
> > > > > >> minimum
> > > > > >> > > > > expected
> > > > > >> > > > > > > > > epoch,
> > > > > >> > > > > > > > > >> > that
> > > > > >> > > > > > > > > >> > > >> the
> > > > > >> > > > > > > > > >> > > >> new epoch does not begin at an earlier
> > > > offset
> > > > > >> than
> > > > > >> > > the
> > > > > >> > > > > > fetch
> > > > > >> > > > > > > > > >> offset.
> > > > > >> > > > > > > > > >> > In
> > > > > >> > > > > > > > > >> > > >> the latter case, the leader can
> respond
> > > > with a
> > > > > >> new
> > > > > >> > > > > > > > LOG_TRUNCATION
> > > > > >> > > > > > > > > >> > error
> > > > > >> > > > > > > > > >> > > >> code" should it be "does not begin at
> a
> > > > later
> > > > > >> > offset
> > > > > >> > > > than
> > > > > >> > > > > > the
> > > > > >> > > > > > > > > fetch
> > > > > >> > > > > > > > > >> > > >> offset"?
> > > > > >> > > > > > > > > >> > > >>
> > > > > >> > > > > > > > > >> > > >>
> > > > > >> > > > > > > > > >> > > >>
> > > > > >> > > > > > > > > >> > > >> Guozhang
> > > > > >> > > > > > > > > >> > > >>
> > > > > >> > > > > > > > > >> > > >>
> > > > > >> > > > > > > > > >> > > >> On Tue, Jun 26, 2018 at 6:51 PM, Dong
> > Lin
> > > <
> > > > > >> > > > > > > lindon...@gmail.com
> > > > > >> > > > > > > > >
> > > > > >> > > > > > > > > >> > wrote:
> > > > > >> > > > > > > > > >> > > >>
> > > > > >> > > > > > > > > >> > > >> > Hey Jason,
> > > > > >> > > > > > > > > >> > > >> >
> > > > > >> > > > > > > > > >> > > >> > Thanks for the explanation.
> > > > > >> > > > > > > > > >> > > >> >
> > > > > >> > > > > > > > > >> > > >> > Please correct me if this is wrong.
> > The
> > > > > >> "unknown
> > > > > >> > > > > > truncation
> > > > > >> > > > > > > > > >> offset"
> > > > > >> > > > > > > > > >> > > >> > scenario happens when consumer does
> > not
> > > > have
> > > > > >> the
> > > > > >> > > full
> > > > > >> > > > > > > > > >> leaderEpoch ->
> > > > > >> > > > > > > > > >> > > >> offset
> > > > > >> > > > > > > > > >> > > >> > mapping. In this case we can still
> use
> > > the
> > > > > >> > > > > KIP-101-based
> > > > > >> > > > > > > > > >> approach to
> > > > > >> > > > > > > > > >> > > >> > truncate offset to "start offset of
> > the
> > > > > first
> > > > > >> > > Leader
> > > > > >> > > > > > Epoch
> > > > > >> > > > > > > > > larger
> > > > > >> > > > > > > > > >> > than
> > > > > >> > > > > > > > > >> > > >> last
> > > > > >> > > > > > > > > >> > > >> > epoch of the consumer" but it may be
> > > > > >> inaccurate.
> > > > > >> > So
> > > > > >> > > > the
> > > > > >> > > > > > KIP
> > > > > >> > > > > > > > > >> chooses
> > > > > >> > > > > > > > > >> > to
> > > > > >> > > > > > > > > >> > > >> use
> > > > > >> > > > > > > > > >> > > >> > the timestamp-based approach which
> is
> > > also
> > > > > >> > > > best-effort.
> > > > > >> > > > > > > > > >> > > >> >
> > > > > >> > > > > > > > > >> > > >> > If this understanding is correct,
> for
> > > > > >> "closest"
> > > > > >> > > > offset
> > > > > >> > > > > > > reset
> > > > > >> > > > > > > > > >> policy
> > > > > >> > > > > > > > > >> > > and
> > > > > >> > > > > > > > > >> > > >> > "unknown truncation offset"
> scenario,
> > I
> > > am
> > > > > >> > > wondering
> > > > > >> > > > > > > whether
> > > > > >> > > > > > > > it
> > > > > >> > > > > > > > > >> > maybe
> > > > > >> > > > > > > > > >> > > >> > better to replace timestamp-based
> > > approach
> > > > > >> with
> > > > > >> > > > KIP-101
> > > > > >> > > > > > > based
> > > > > >> > > > > > > > > >> > > approach.
> > > > > >> > > > > > > > > >> > > >> In
> > > > > >> > > > > > > > > >> > > >> > comparison to timestamp-based
> > approach,
> > > > the
> > > > > >> > > > > KIP-101-based
> > > > > >> > > > > > > > > >> approach
> > > > > >> > > > > > > > > >> > > >> seems to
> > > > > >> > > > > > > > > >> > > >> > simplify the API a bit since user
> does
> > > not
> > > > > >> need
> > > > > >> > to
> > > > > >> > > > > > > understand
> > > > > >> > > > > > > > > >> > > timestamp.
> > > > > >> > > > > > > > > >> > > >> > Similar to the timestamp-based
> > approach,
> > > > > both
> > > > > >> > > > > approaches
> > > > > >> > > > > > > are
> > > > > >> > > > > > > > > >> > > best-effort
> > > > > >> > > > > > > > > >> > > >> > and do not guarantee that consumer
> can
> > > > > consume
> > > > > >> > all
> > > > > >> > > > > > > messages.
> > > > > >> > > > > > > > It
> > > > > >> > > > > > > > > >> is
> > > > > >> > > > > > > > > >> > not
> > > > > >> > > > > > > > > >> > > >> like
> > > > > >> > > > > > > > > >> > > >> > KIP-279 which guarantees that
> follower
> > > > > broker
> > > > > >> can
> > > > > >> > > > > consume
> > > > > >> > > > > > > all
> > > > > >> > > > > > > > > >> > messages
> > > > > >> > > > > > > > > >> > > >> from
> > > > > >> > > > > > > > > >> > > >> > the leader.
> > > > > >> > > > > > > > > >> > > >> >
> > > > > >> > > > > > > > > >> > > >> > Then it seems that the remaining
> > > > difference
> > > > > is
> > > > > >> > > mostly
> > > > > >> > > > > > about
> > > > > >> > > > > > > > > >> > accuracy,
> > > > > >> > > > > > > > > >> > > >> i.e.
> > > > > >> > > > > > > > > >> > > >> > how much message will be duplicated
> or
> > > > > missed
> > > > > >> in
> > > > > >> > > the
> > > > > >> > > > > > > "unknown
> > > > > >> > > > > > > > > >> > > truncation
> > > > > >> > > > > > > > > >> > > >> > offset" scenario. Not sure either
> one
> > is
> > > > > >> clearly
> > > > > >> > > > better
> > > > > >> > > > > > > than
> > > > > >> > > > > > > > > the
> > > > > >> > > > > > > > > >> > > other.
> > > > > >> > > > > > > > > >> > > >> > Note that there are two scenarios
> > > > mentioned
> > > > > in
> > > > > >> > > > KIP-279
> > > > > >> > > > > > > which
> > > > > >> > > > > > > > > are
> > > > > >> > > > > > > > > >> not
> > > > > >> > > > > > > > > >> > > >> > addressed by KIP-101. Both scenarios
> > > > require
> > > > > >> > quick
> > > > > >> > > > > > > leadership
> > > > > >> > > > > > > > > >> change
> > > > > >> > > > > > > > > >> > > >> > between brokers, which seems to
> > suggest
> > > > that
> > > > > >> the
> > > > > >> > > > offset
> > > > > >> > > > > > > based
> > > > > >> > > > > > > > > >> > obtained
> > > > > >> > > > > > > > > >> > > >> > by "start
> > > > > >> > > > > > > > > >> > > >> > offset of the first Leader Epoch
> > larger
> > > > than
> > > > > >> last
> > > > > >> > > > epoch
> > > > > >> > > > > > of
> > > > > >> > > > > > > > the
> > > > > >> > > > > > > > > >> > > consumer"
> > > > > >> > > > > > > > > >> > > >> > under these two scenarios may be
> very
> > > > close
> > > > > to
> > > > > >> > the
> > > > > >> > > > > offset
> > > > > >> > > > > > > > > >> obtained
> > > > > >> > > > > > > > > >> > by
> > > > > >> > > > > > > > > >> > > >> the
> > > > > >> > > > > > > > > >> > > >> > message timestamp. Does this sound
> > > > > reasonable?
> > > > > >> > > > > > > > > >> > > >> >
> > > > > >> > > > > > > > > >> > > >> > Good point that users on v1 format
> can
> > > get
> > > > > >> > benefit
> > > > > >> > > > with
> > > > > >> > > > > > > > > timestamp
> > > > > >> > > > > > > > > >> > > based
> > > > > >> > > > > > > > > >> > > >> > approach. On the other hand it seems
> > > like
> > > > a
> > > > > >> short
> > > > > >> > > > term
> > > > > >> > > > > > > > benefit
> > > > > >> > > > > > > > > >> for
> > > > > >> > > > > > > > > >> > > users
> > > > > >> > > > > > > > > >> > > >> > who have not migrated. I am just not
> > > sure
> > > > > >> whether
> > > > > >> > > it
> > > > > >> > > > is
> > > > > >> > > > > > > more
> > > > > >> > > > > > > > > >> > important
> > > > > >> > > > > > > > > >> > > >> than
> > > > > >> > > > > > > > > >> > > >> > designing a better API.
> > > > > >> > > > > > > > > >> > > >> >
> > > > > >> > > > > > > > > >> > > >> > Also, for both "latest" and
> "earliest"
> > > > reset
> > > > > >> > > policy,
> > > > > >> > > > do
> > > > > >> > > > > > you
> > > > > >> > > > > > > > > >> think it
> > > > > >> > > > > > > > > >> > > >> would
> > > > > >> > > > > > > > > >> > > >> > make sense to also use the KIP-101
> > based
> > > > > >> approach
> > > > > >> > > to
> > > > > >> > > > > > > truncate
> > > > > >> > > > > > > > > >> offset
> > > > > >> > > > > > > > > >> > > for
> > > > > >> > > > > > > > > >> > > >> > the "unknown truncation offset"
> > > scenario?
> > > > > >> > > > > > > > > >> > > >> >
> > > > > >> > > > > > > > > >> > > >> >
> > > > > >> > > > > > > > > >> > > >> > Thanks,
> > > > > >> > > > > > > > > >> > > >> > Dong
> > > > > >> > > > > > > > > >> > > >> >
> > > > > >> > > > > > > > > >> > > >>
> > > > > >> > > > > > > > > >> > > >>
> > > > > >> > > > > > > > > >> > > >>
> > > > > >> > > > > > > > > >> > > >> --
> > > > > >> > > > > > > > > >> > > >> -- Guozhang
> > > > > >> > > > > > > > > >> > > >>
> > > > > >> > > > > > > > > >> > > >
> > > > > >> > > > > > > > > >> > > >
> > > > > >> > > > > > > > > >> > >
> > > > > >> > > > > > > > > >> >
> > > > > >> > > > > > > > > >> >
> > > > > >> > > > > > > > > >> >
> > > > > >> > > > > > > > > >> > --
> > > > > >> > > > > > > > > >> > -- Guozhang
> > > > > >> > > > > > > > > >> >
> > > > > >> > > > > > > > > >>
> > > > > >> > > > > > > > > >
> > > > > >> > > > > > > > > >
> > > > > >> > > > > > > > >
> > > > > >> > > > > > > >
> > > > > >> > > > > > >
> > > > > >> > > > > >
> > > > > >> > > > >
> > > > > >> > > >
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to