Hi All,

I have made some updates to the KIP. As many of you know, a side project of
mine has been specifying the Kafka replication protocol in TLA. You can
check out the code here if you are interested:
https://github.com/hachikuji/kafka-specification. In addition to uncovering
a couple unknown bugs in the replication protocol (e.g.
https://issues.apache.org/jira/browse/KAFKA-7128), this has helped me
validate the behavior in this KIP. In fact, the original version I proposed
had a weakness. I initially suggested letting the leader validate the
expected epoch at the fetch offset. This made sense for the consumer in the
handling of unclean leader election, but it was not strong enough to
protect the follower in all cases. In order to make advancement of the high
watermark safe, for example, the leader actually needs to be sure that
every follower in the ISR matches its own epoch.

I attempted to fix this problem by treating the epoch in the fetch request
slightly differently for consumers and followers. For consumers, it would
be the expected epoch of the record at the fetch offset, and the leader
would raise a LOG_TRUNCATION error if the expectation failed. For
followers, it would be the current epoch and the leader would require that
it match its own epoch. This was unsatisfying both because of the
inconsistency in behavior and because the consumer was left with the weaker
fencing that we already knew was insufficient for the replicas. Ultimately
I decided that we should make the behavior consistent and that meant that
the consumer needed to act more like a following replica. Instead of
checking for truncation while fetching, the consumer should check for
truncation after leader changes. After checking for truncation, the
consumer can then use the current epoch when fetching and get the stronger
protection that it provides. What this means is that the Metadata API must
include the current leader epoch. Given the problems we have had around
stale metadata and how challenging they have been to debug, I'm convinced
that this is a good idea in any case and it resolves the inconsistent
behavior in the Fetch API. The downside is that there will be some
additional overhead upon leader changes, but I don't think it is a major
concern since leader changes are rare and the OffsetForLeaderEpoch request
is cheap.

This approach leaves the door open for some interesting follow up
improvements. For example, now that we have the leader epoch in the
Metadata request, we can implement similar fencing for the Produce API. And
now that the consumer can reason about truncation, we could consider having
a configuration to expose records beyond the high watermark. This would let
users trade lower end-to-end latency for weaker durability semantics. It is
sort of like having an acks=0 option for the consumer. Neither of these
options are included in this KIP, I am just mentioning them as potential
work for the future.

Finally, based on the discussion in this thread, I have added the
seekToCommitted API for the consumer. Please take a look and let me know
what you think.

Thanks,
Jason

On Fri, Jul 20, 2018 at 2:34 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hi Jason,
>
> The proposed API seems reasonable to me too. Could you please also update
> the wiki page (
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 320%3A+Allow+fetchers+to+detect+and+handle+log+truncation)
> with a section say "workflow" on how the proposed API will be co-used with
> others to:
>
> 1. consumer callers handling a LogTruncationException.
> 2. consumer internals for handling a retriable UnknownLeaderEpochException.
>
>
> Guozhang
>
>
> On Tue, Jul 17, 2018 at 10:23 AM, Anna Povzner <a...@confluent.io> wrote:
>
> > Hi Jason,
> >
> >
> > I also like your proposal and agree that KafkaConsumer#seekToCommitted()
> > is
> > more intuitive as a way to initialize both consumer's position and its
> > fetch state.
> >
> >
> > My understanding that KafkaConsumer#seekToCommitted() is purely for
> > clients
> > who store their offsets externally, right? And we are still going to
> > add KafkaConsumer#findOffsets()
> > in this KIP as we discussed, so that the client can handle
> > LogTruncationException?
> >
> >
> > Thanks,
> >
> > Anna
> >
> >
> > On Thu, Jul 12, 2018 at 3:57 PM Dong Lin <lindon...@gmail.com> wrote:
> >
> > > Hey Jason,
> > >
> > > It is a great summary. The solution sounds good. I might have minor
> > > comments regarding the method name. But we can discuss that minor
> points
> > > later after we reach consensus on the high level API.
> > >
> > > Thanks,
> > > Dong
> > >
> > >
> > > On Thu, Jul 12, 2018 at 11:41 AM, Jason Gustafson <ja...@confluent.io>
> > > wrote:
> > >
> > > > Hey Anna and Dong,
> > > >
> > > > Thanks a lot for the great discussion. I've been hanging back a bit
> > > because
> > > > honestly the best option hasn't seemed clear. I agree with Anna's
> > general
> > > > observation that there is a distinction between the position of the
> > > > consumer and its fetch state up to that position. If you think about
> > it,
> > > a
> > > > committed offset actually represents both of these. The metadata is
> > used
> > > to
> > > > initialize the state of the consumer application and the offset
> > > initializes
> > > > the position. Additionally, we are extending the offset commit in
> this
> > > KIP
> > > > to also include the last epoch fetched by the consumer, which is used
> > to
> > > > initialize the internal fetch state. Of course if you do an arbitrary
> > > > `seek` and immediately commit offsets, then there won't be a last
> epoch
> > > to
> > > > commit. This seems intuitive since there is no fetch state in this
> > case.
> > > We
> > > > only commit fetch state when we have it.
> > > >
> > > > So if we think about a committed offset as initializing both the
> > > consumer's
> > > > position and its fetch state, then the gap in the API is evidently
> that
> > > we
> > > > don't have a way to initialize the consumer to a committed offset. We
> > do
> > > it
> > > > implicitly of course for offsets stored in Kafka, but since external
> > > > storage is a use case we support, then we should have an explicit API
> > as
> > > > well. Perhaps something like this:
> > > >
> > > > seekToCommitted(TopicPartition, OffsetAndMetadata)
> > > >
> > > > In this KIP, we are proposing to allow the `OffsetAndMetadata` object
> > to
> > > > include the leader epoch, so I think this would have the same effect
> as
> > > > Anna's suggested `seekToRecord`. But perhaps it is a more natural fit
> > > given
> > > > the current API? Furthermore, if we find a need for additional
> metadata
> > > in
> > > > the offset commit API in the future, then we will just need to modify
> > the
> > > > `OffsetAndMetadata` object and we will not need a new `seek` API.
> > > >
> > > > With this approach, I think then we can leave the `position` API as
> it
> > > is.
> > > > The position of the consumer is still just the next expected fetch
> > > offset.
> > > > If a user needs to record additional state based on previous fetch
> > > > progress, then they would use the result of the previous fetch to
> > obtain
> > > > it. This makes the dependence on fetch progress explicit. I think we
> > > could
> > > > make this a little more convenience with a helper in the
> > > `ConsumerRecords`
> > > > object, but I think that's more of a nice-to-have.
> > > >
> > > > Thoughts?
> > > >
> > > > By the way, I have been iterating a little bit on the replica side of
> > > this
> > > > KIP. My initial proposal in fact did not have strong enough fencing
> to
> > > > protect all of the edge cases. I believe the current proposal fixes
> the
> > > > problems, but I am still verifying the model.
> > > >
> > > > Thanks,
> > > > Jason
> > > >
> > > >
> > > > On Wed, Jul 11, 2018 at 10:45 AM, Dong Lin <lindon...@gmail.com>
> > wrote:
> > > >
> > > > > Hey Anna,
> > > > >
> > > > > Thanks much for the explanation. Approach 1 also sounds good to
> me. I
> > > > think
> > > > > findOffsets() is useful for users who don't use automatic offset
> > reset
> > > > > policy.
> > > > >
> > > > > Just one more question. Since users who store offsets externally
> need
> > > to
> > > > > provide leaderEpoch to findOffsets(...), do we need an extra API
> for
> > > user
> > > > > to get both offset and leaderEpoch, e.g. recordPosition()?
> > > > >
> > > > > Thanks,
> > > > > Dong
> > > > >
> > > > > On Wed, Jul 11, 2018 at 10:12 AM, Anna Povzner <a...@confluent.io>
> > > > wrote:
> > > > >
> > > > > > Hi Dong,
> > > > > >
> > > > > >
> > > > > > What I called “not covering all use cases” is what you call
> > > best-effort
> > > > > > (not guaranteeing some corner cases). I think we are on the same
> > page
> > > > > here.
> > > > > >
> > > > > >
> > > > > > I wanted to be clear in the API whether the consumer seeks to a
> > > > position
> > > > > > (offset) or to a record (offset, leader epoch). The only use-case
> > of
> > > > > > seeking to a record is seeking to a committed offset for a user
> who
> > > > > stores
> > > > > > committed offsets externally. (Unless users find some other
> reason
> > to
> > > > > seek
> > > > > > to a record.) I thought it was possible to provide this
> > functionality
> > > > > with
> > > > > > findOffset(offset, leader epoch) followed by a seek(offset).
> > However,
> > > > you
> > > > > > are right that this will not handle the race condition where
> > > > > non-divergent
> > > > > > offset found by findOffset() could change again before the
> consumer
> > > > does
> > > > > > the first fetch.
> > > > > >
> > > > > >
> > > > > > Regarding position() — if we add position that returns (offset,
> > > leader
> > > > > > epoch), this is specifically a position after a record that was
> > > > actually
> > > > > > consumed or position of a committed record. In which case, I
> still
> > > > think
> > > > > > it’s cleaner to get a record position of consumed message from a
> > new
> > > > > helper
> > > > > > method in ConsumerRecords() or from committed offsets.
> > > > > >
> > > > > >
> > > > > > I think all the use-cases could be then covered with:
> > > > > >
> > > > > > (Approach 1)
> > > > > >
> > > > > > seekToRecord(offset, leaderEpoch) — this will just initialize/set
> > the
> > > > > > consumer state;
> > > > > >
> > > > > > findOffsets(offset, leaderEpoch) returns {offset, leaderEpoch}
> > > > > >
> > > > > >
> > > > > > If we agree that the race condition is also a corner case, then I
> > > think
> > > > > we
> > > > > > can cover use-cases with:
> > > > > >
> > > > > > (Approach 2)
> > > > > >
> > > > > > findOffsets(offset, leaderEpoch) returns offset — we still want
> > > leader
> > > > > > epoch as a parameter for the users who store their committed
> > offsets
> > > > > > externally.
> > > > > >
> > > > > >
> > > > > > I am actually now leaning more to approach 1, since it is more
> > > > explicit,
> > > > > > and maybe there are more use cases for it.
> > > > > >
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Anna
> > > > > >
> > > > > >
> > > > > > On Tue, Jul 10, 2018 at 3:47 PM Dong Lin <lindon...@gmail.com>
> > > wrote:
> > > > > >
> > > > > > > Hey Anna,
> > > > > > >
> > > > > > > Thanks for the comment. To answer your question, it seems that
> we
> > > can
> > > > > > cover
> > > > > > > all case in this KIP. As stated in "Consumer Handling" section,
> > > > KIP-101
> > > > > > > based approach will be used to derive the truncation offset
> from
> > > the
> > > > > > > 2-tuple (offset, leaderEpoch). This approach is best effort and
> > it
> > > is
> > > > > > > inaccurate only in very rare scenarios (as described in
> KIP-279).
> > > > > > >
> > > > > > > By using seek(offset, leaderEpoch), consumer will still be able
> > to
> > > > > follow
> > > > > > > this best-effort approach to detect log truncation and
> determine
> > > the
> > > > > > > truncation offset. On the other hand, if we use seek(offset),
> > > > consumer
> > > > > > will
> > > > > > > not detect log truncation in some cases which weakens the
> > guarantee
> > > > of
> > > > > > this
> > > > > > > KIP. Does this make sense?
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Dong
> > > > > > >
> > > > > > > On Tue, Jul 10, 2018 at 3:14 PM, Anna Povzner <
> a...@confluent.io
> > >
> > > > > wrote:
> > > > > > >
> > > > > > > > Sorry, I hit "send" before finishing. Continuing...
> > > > > > > >
> > > > > > > >
> > > > > > > > 2) Hiding most of the consumer handling log truncation logic
> > with
> > > > > > minimal
> > > > > > > > exposure in KafkaConsumer API.  I was proposing this path.
> > > > > > > >
> > > > > > > >
> > > > > > > > Before answering your specific questions… I want to answer to
> > > your
> > > > > > > comment
> > > > > > > > “In general, maybe we should discuss the final solution that
> > > covers
> > > > > all
> > > > > > > > cases?”. With current KIP, we don’t cover all cases of
> consumer
> > > > > > detecting
> > > > > > > > log truncation because the KIP proposes a leader epoch cache
> in
> > > > > > consumer
> > > > > > > > that does not persist across restarts. Plus, we only store
> last
> > > > > > committed
> > > > > > > > offset (either internally or users can store externally).
> This
> > > has
> > > > a
> > > > > > > > limitation that the consumer will not always be able to find
> > > point
> > > > of
> > > > > > > > truncation just because we have a limited history (just one
> > data
> > > > > > point).
> > > > > > > >
> > > > > > > >
> > > > > > > > So, maybe we should first agree on whether we accept that
> > storing
> > > > > last
> > > > > > > > committed offset/leader epoch has a limitation that the
> > consumer
> > > > will
> > > > > > not
> > > > > > > > be able to detect log truncation in all cases?
> > > > > > > >
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Anna
> > > > > > > >
> > > > > > > > On Tue, Jul 10, 2018 at 2:20 PM Anna Povzner <
> > a...@confluent.io>
> > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Dong,
> > > > > > > > >
> > > > > > > > > Thanks for the follow up! I finally have much more clear
> > > > > > understanding
> > > > > > > of
> > > > > > > > > where you are coming from.
> > > > > > > > >
> > > > > > > > > You are right. The success of findOffsets()/finding a point
> > of
> > > > > > > > > non-divergence depends on whether we have enough entries in
> > the
> > > > > > > > consumer's
> > > > > > > > > leader epoch cache. However, I think this is a fundamental
> > > > > limitation
> > > > > > > of
> > > > > > > > > having a leader epoch cache that does not persist across
> > > consumer
> > > > > > > > restarts.
> > > > > > > > >
> > > > > > > > > If we consider the general case where consumer may or may
> not
> > > > have
> > > > > > this
> > > > > > > > > cache, then I see two paths:
> > > > > > > > > 1) Letting the user to track the leader epoch history
> > > externally,
> > > > > and
> > > > > > > > have
> > > > > > > > > more exposure to leader epoch and finding point of
> > > non-divergence
> > > > > in
> > > > > > > > > KafkaConsumer API. I understand this is the case you were
> > > talking
> > > > > > > about.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Tue, Jul 10, 2018 at 12:16 PM Dong Lin <
> > lindon...@gmail.com
> > > >
> > > > > > wrote:
> > > > > > > > >
> > > > > > > > >> Hey Anna,
> > > > > > > > >>
> > > > > > > > >> Thanks much for your detailed explanation and example! It
> > does
> > > > > help
> > > > > > me
> > > > > > > > >> understand the difference between our understanding.
> > > > > > > > >>
> > > > > > > > >> So it seems that the solution based on findOffsets()
> > currently
> > > > > > focuses
> > > > > > > > >> mainly on the scenario that consumer has cached
> leaderEpoch
> > ->
> > > > > > offset
> > > > > > > > >> mapping whereas I was thinking about the general case
> where
> > > > > consumer
> > > > > > > may
> > > > > > > > >> or
> > > > > > > > >> may not have this cache. I guess that is why we have
> > different
> > > > > > > > >> understanding here. I have some comments below.
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >> 3) The proposed solution using findOffsets(offset,
> > > leaderEpoch)
> > > > > > > followed
> > > > > > > > >> by
> > > > > > > > >> seek(offset) works if consumer has the cached leaderEpoch
> ->
> > > > > offset
> > > > > > > > >> mapping. But if we assume consumer has this cache, do we
> > need
> > > to
> > > > > > have
> > > > > > > > >> leaderEpoch in the findOffsets(...)? Intuitively, the
> > > > > > > > findOffsets(offset)
> > > > > > > > >> can also derive the leaderEpoch using offset just like the
> > > > > proposed
> > > > > > > > >> solution does with seek(offset).
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >> 4) If consumer does not have cached leaderEpoch -> offset
> > > > mapping,
> > > > > > > which
> > > > > > > > >> is
> > > > > > > > >> the case if consumer is restarted on a new machine, then
> it
> > is
> > > > not
> > > > > > > clear
> > > > > > > > >> what leaderEpoch would be included in the FetchRequest if
> > > > consumer
> > > > > > > does
> > > > > > > > >> seek(offset). This is the case that motivates the first
> > > question
> > > > > of
> > > > > > > the
> > > > > > > > >> previous email. In general, maybe we should discuss the
> > final
> > > > > > solution
> > > > > > > > >> that
> > > > > > > > >> covers all cases?
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >> 5) The second question in my previous email is related to
> > the
> > > > > > > following
> > > > > > > > >> paragraph:
> > > > > > > > >>
> > > > > > > > >> "... In some cases, offsets returned from position() could
> > be
> > > > > actual
> > > > > > > > >> consumed messages by this consumer identified by {offset,
> > > leader
> > > > > > > epoch}.
> > > > > > > > >> In
> > > > > > > > >> other cases, position() returns offset that was not
> actually
> > > > > > consumed.
> > > > > > > > >> Suppose, the user calls position() for the last
> offset...".
> > > > > > > > >>
> > > > > > > > >> I guess my point is that, if user calls position() for the
> > > last
> > > > > > offset
> > > > > > > > and
> > > > > > > > >> uses that offset in seek(...), then user can probably just
> > > call
> > > > > > > > >> Consumer#seekToEnd() without calling position() and
> > seek(...).
> > > > > > > Similarly
> > > > > > > > >> user can call Consumer#seekToBeginning() to the seek to
> the
> > > > > earliest
> > > > > > > > >> position without calling position() and seek(...). Thus
> > > > position()
> > > > > > > only
> > > > > > > > >> needs to return the actual consumed messages identified by
> > > > > {offset,
> > > > > > > > leader
> > > > > > > > >> epoch}. Does this make sense?
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >> Thanks,
> > > > > > > > >> Dong
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >> On Mon, Jul 9, 2018 at 6:47 PM, Anna Povzner <
> > > a...@confluent.io
> > > > >
> > > > > > > wrote:
> > > > > > > > >>
> > > > > > > > >> > Hi Dong,
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > Thanks for considering my suggestions.
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > Based on your comments, I realized that my suggestion
> was
> > > not
> > > > > > > complete
> > > > > > > > >> with
> > > > > > > > >> > regard to KafkaConsumer API vs. consumer-broker
> protocol.
> > > > While
> > > > > I
> > > > > > > > >> propose
> > > > > > > > >> > to keep KafkaConsumer#seek() unchanged and take offset
> > only,
> > > > the
> > > > > > > > >> underlying
> > > > > > > > >> > consumer will send the next FetchRequest() to broker
> with
> > > > offset
> > > > > > and
> > > > > > > > >> > leaderEpoch if it is known (based on leader epoch cache
> in
> > > > > > > consumer) —
> > > > > > > > >> note
> > > > > > > > >> > that this is different from the current KIP, which
> > suggests
> > > to
> > > > > > > always
> > > > > > > > >> send
> > > > > > > > >> > unknown leader epoch after seek(). This way, if the
> > consumer
> > > > > and a
> > > > > > > > >> broker
> > > > > > > > >> > agreed on the point of non-divergence, which is some
> > > {offset,
> > > > > > > > >> leaderEpoch}
> > > > > > > > >> > pair, the new leader which causes another truncation
> (even
> > > > > further
> > > > > > > > back)
> > > > > > > > >> > will be able to detect new divergence and restart the
> > > process
> > > > of
> > > > > > > > finding
> > > > > > > > >> > the new point of non-divergence. So, to answer your
> > > question,
> > > > If
> > > > > > the
> > > > > > > > >> > truncation happens just after the user calls
> > > > > > > > >> > KafkaConsumer#findOffsets(offset, leaderEpoch) followed
> > by
> > > > > > > > seek(offset),
> > > > > > > > >> > the user will not seek to the wrong position without
> > knowing
> > > > > that
> > > > > > > > >> > truncation has happened, because the consumer will get
> > > another
> > > > > > > > >> truncation
> > > > > > > > >> > error, and seek again.
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > I am afraid, I did not understand your second question.
> > Let
> > > me
> > > > > > > > >> summarize my
> > > > > > > > >> > suggestions again, and then give an example to hopefully
> > > make
> > > > my
> > > > > > > > >> > suggestions more clear. Also, the last part of my
> example
> > > > shows
> > > > > > how
> > > > > > > > the
> > > > > > > > >> > use-case in your first question will work. If it does
> not
> > > > answer
> > > > > > > your
> > > > > > > > >> > second question, would you mind clarifying? I am also
> > > focusing
> > > > > on
> > > > > > > the
> > > > > > > > >> case
> > > > > > > > >> > of a consumer having enough entries in the cache. The
> case
> > > of
> > > > > > > > restarting
> > > > > > > > >> > from committed offset either stored externally or
> > internally
> > > > > will
> > > > > > > > >> probably
> > > > > > > > >> > need to be discussed more.
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > Let me summarize my suggestion again:
> > > > > > > > >> >
> > > > > > > > >> > 1) KafkaConsumer#seek() and KafkaConsumer#position()
> > remains
> > > > > > > unchanged
> > > > > > > > >> >
> > > > > > > > >> > 2) New KafkaConsumer#findOffsets() takes {offset,
> > > leaderEpoch}
> > > > > > pair
> > > > > > > > per
> > > > > > > > >> > topic partition and returns offset per topic partition.
> > > > > > > > >> >
> > > > > > > > >> > 3) FetchRequest() to broker after KafkaConsumer#seek()
> > will
> > > > > > contain
> > > > > > > > the
> > > > > > > > >> > offset set by seek and leaderEpoch that corresponds to
> the
> > > > > offset
> > > > > > > > based
> > > > > > > > >> on
> > > > > > > > >> > leader epoch cache in the consumer.
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > The rest of this e-mail is a long and contrived example
> > with
> > > > > > several
> > > > > > > > log
> > > > > > > > >> > truncations and unclean leader elections to illustrate
> the
> > > API
> > > > > and
> > > > > > > > your
> > > > > > > > >> > first use-case. Suppose we have three brokers.
> Initially,
> > > > Broker
> > > > > > A,
> > > > > > > B,
> > > > > > > > >> and
> > > > > > > > >> > C has one message at offset 0 with leader epoch 0. Then,
> > > > Broker
> > > > > A
> > > > > > > goes
> > > > > > > > >> down
> > > > > > > > >> > for some time. Broker B becomes a leader with epoch 1,
> and
> > > > > writes
> > > > > > > > >> messages
> > > > > > > > >> > to offsets 1 and 2. Broker C fetches offset 1, but
> before
> > > > > fetching
> > > > > > > > >> offset
> > > > > > > > >> > 2, becomes a leader with leader epoch 2 and writes a
> > message
> > > > at
> > > > > > > offset
> > > > > > > > >> 2.
> > > > > > > > >> > Here is the state of brokers at this point:
> > > > > > > > >> >
> > > > > > > > >> > > Broker A:
> > > > > > > > >> > > offset 0, epoch 0 <— leader
> > > > > > > > >> > > goes down…
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > > Broker B:
> > > > > > > > >> > > offset 0, epoch 0
> > > > > > > > >> > > offset 1, epoch 1  <- leader
> > > > > > > > >> > > offset 2, epoch 1
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > Broker C:
> > > > > > > > >> > > offset 0, epoch 0
> > > > > > > > >> > > offset 1, epoch 1
> > > > > > > > >> > > offset 2, epoch 2 <— leader
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > Before Broker C becomes a leader with leader epoch 2,
> the
> > > > > consumer
> > > > > > > > >> consumed
> > > > > > > > >> > the following messages from broker A and broker B:
> > > > > > > > >> >
> > > > > > > > >> > {offset=0, leaderEpoch=0}, {offset=1, leaderEpoch=1},
> > > > {offset=2,
> > > > > > > > >> > leaderEpoch=1}.
> > > > > > > > >> >
> > > > > > > > >> > Consumer’s leader epoch cache at this point contains the
> > > > > following
> > > > > > > > >> entries:
> > > > > > > > >> >
> > > > > > > > >> > (leaderEpoch=0, startOffset=0)
> > > > > > > > >> >
> > > > > > > > >> > (leaderEpoch=1, startOffset=1)
> > > > > > > > >> >
> > > > > > > > >> > endOffset = 3
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > Then, broker B becomes the follower of broker C,
> truncates
> > > and
> > > > > > > starts
> > > > > > > > >> > fetching from offset 2.
> > > > > > > > >> >
> > > > > > > > >> > Consumer sends fetchRequest(offset=3, leaderEpoch=1) and
> > > gets
> > > > > > > > >> > LOG_TRUNCATION
> > > > > > > > >> > error from broker C.
> > > > > > > > >> >
> > > > > > > > >> > In response, the client calls KafkaConsumer#findOffsets(
> > > > > offset=3,
> > > > > > > > >> > leaderEpoch=1). The underlying consumer sends
> > > > > > > > >> > OffsetsForLeaderEpoch(leaderEpoch=1), broker C responds
> > with
> > > > > > > > >> > {leaderEpoch=1, endOffset=2}. So,
> > > > > > > KafkaConsumer#findOffsets(offset=3,
> > > > > > > > >> > leaderEpoch=1) returns offset=2.
> > > > > > > > >> >
> > > > > > > > >> > In response, consumer calls KafkaConsumer@seek
> (offset=2)
> > > > > followed
> > > > > > > by
> > > > > > > > >> > poll(), which results in FetchRequest(offset=2,
> > > leaderEpoch=1)
> > > > > to
> > > > > > > > >> broker C.
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > I will continue with this example with the goal to
> answer
> > > your
> > > > > > first
> > > > > > > > >> > question about truncation just after findOffsets()
> > followed
> > > by
> > > > > > > seek():
> > > > > > > > >> >
> > > > > > > > >> > Suppose, brokers B and C go down, and broker A comes up
> > and
> > > > > > becomes
> > > > > > > a
> > > > > > > > >> > leader with leader epoch 3, and writes a message to
> offset
> > > 1.
> > > > > > > Suppose,
> > > > > > > > >> this
> > > > > > > > >> > happens before the consumer gets response from broker C
> to
> > > the
> > > > > > > > previous
> > > > > > > > >> > fetch request:  FetchRequest(offset=2, leaderEpoch=1).
> > > > > > > > >> >
> > > > > > > > >> > Consumer re-sends FetchRequest(offset=2, leaderEpoch=1)
> to
> > > > > broker
> > > > > > A,
> > > > > > > > >> which
> > > > > > > > >> > returns LOG_TRUNCATION error, because broker A has
> leader
> > > > epoch
> > > > > 3
> > > > > > >
> > > > > > > > >> leader
> > > > > > > > >> > epoch in FetchRequest with starting offset = 1 < offset
> 2
> > in
> > > > > > > > >> > FetchRequest().
> > > > > > > > >> >
> > > > > > > > >> > In response, the user calls KafkaConsumer#findOffsets(
> > > > offset=2,
> > > > > > > > >> > leaderEpoch=1). The underlying consumer sends
> > > > > > > > >> > OffsetsForLeaderEpoch(leaderEpoch=1), broker A responds
> > with
> > > > > > > > >> > {leaderEpoch=0, endOffset=1}; the underlying consumer
> > finds
> > > > > > > > leaderEpoch
> > > > > > > > >> = 0
> > > > > > > > >> > in its cache with end offset == 1, which results in
> > > > > > > > >> > KafkaConsumer#findOffsets(offset=2, leaderEpoch=1)
> > returning
> > > > > > offset
> > > > > > > > = 1.
> > > > > > > > >> >
> > > > > > > > >> > In response, the user calls KafkaConsumer@seek
> (offset=1)
> > > > > followed
> > > > > > > by
> > > > > > > > >> > poll(), which results in FetchRequest(offset=1,
> > > leaderEpoch=0)
> > > > > to
> > > > > > > > >> broker A,
> > > > > > > > >> > which responds with message at offset 1, leader epoch 3.
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > I will think some more about consumers restarting from
> > > > committed
> > > > > > > > >> offsets,
> > > > > > > > >> > and send a follow up.
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > Thanks,
> > > > > > > > >> >
> > > > > > > > >> > Anna
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > On Sat, Jul 7, 2018 at 1:36 AM Dong Lin <
> > > lindon...@gmail.com>
> > > > > > > wrote:
> > > > > > > > >> >
> > > > > > > > >> > > Hey Anna,
> > > > > > > > >> > >
> > > > > > > > >> > > Thanks much for the thoughtful reply. It makes sense
> to
> > > > > > different
> > > > > > > > >> between
> > > > > > > > >> > > "seeking to a message" and "seeking to a position". I
> > have
> > > > to
> > > > > > > > >> questions
> > > > > > > > >> > > here:
> > > > > > > > >> > >
> > > > > > > > >> > > - For "seeking to a message" use-case, with the
> proposed
> > > > > > approach
> > > > > > > > user
> > > > > > > > >> > > needs to call findOffset(offset, leaderEpoch) followed
> > by
> > > > > > > > >> seek(offset).
> > > > > > > > >> > If
> > > > > > > > >> > > message truncation and message append happen
> immediately
> > > > after
> > > > > > > > >> > > findOffset(offset,
> > > > > > > > >> > > leaderEpoch) but before seek(offset), it seems that
> user
> > > > will
> > > > > > seek
> > > > > > > > to
> > > > > > > > >> the
> > > > > > > > >> > > wrong message without knowing the truncation has
> > happened.
> > > > > Would
> > > > > > > > this
> > > > > > > > >> be
> > > > > > > > >> > a
> > > > > > > > >> > > problem?
> > > > > > > > >> > >
> > > > > > > > >> > > - For "seeking to a position" use-case, it seems that
> > > there
> > > > > can
> > > > > > be
> > > > > > > > two
> > > > > > > > >> > > positions, i.e. earliest and latest. So these two
> cases
> > > can
> > > > be
> > > > > > > > >> > > Consumer.fulfilled by seekToBeginning() and
> > > > > > Consumer.seekToEnd().
> > > > > > > > >> Then it
> > > > > > > > >> > > seems that user will only need to call position() and
> > > seek()
> > > > > for
> > > > > > > > >> "seeking
> > > > > > > > >> > > to a message" use-case?
> > > > > > > > >> > >
> > > > > > > > >> > > Thanks,
> > > > > > > > >> > > Dong
> > > > > > > > >> > >
> > > > > > > > >> > >
> > > > > > > > >> > > On Wed, Jul 4, 2018 at 12:33 PM, Anna Povzner <
> > > > > > a...@confluent.io>
> > > > > > > > >> wrote:
> > > > > > > > >> > >
> > > > > > > > >> > > > Hi Jason and Dong,
> > > > > > > > >> > > >
> > > > > > > > >> > > >
> > > > > > > > >> > > > I’ve been thinking about your suggestions and
> > discussion
> > > > > > > regarding
> > > > > > > > >> > > > position(), seek(), and new proposed API.
> > > > > > > > >> > > >
> > > > > > > > >> > > >
> > > > > > > > >> > > > Here is my thought process why we should keep
> > position()
> > > > and
> > > > > > > > seek()
> > > > > > > > >> API
> > > > > > > > >> > > > unchanged.
> > > > > > > > >> > > >
> > > > > > > > >> > > >
> > > > > > > > >> > > > I think we should separate {offset, leader epoch}
> that
> > > > > > uniquely
> > > > > > > > >> > > identifies
> > > > > > > > >> > > > a message from an offset that is a position. In some
> > > > cases,
> > > > > > > > offsets
> > > > > > > > >> > > > returned from position() could be actual consumed
> > > messages
> > > > > by
> > > > > > > this
> > > > > > > > >> > > consumer
> > > > > > > > >> > > > identified by {offset, leader epoch}. In other
> cases,
> > > > > > position()
> > > > > > > > >> > returns
> > > > > > > > >> > > > offset that was not actually consumed. Suppose, the
> > user
> > > > > calls
> > > > > > > > >> > position()
> > > > > > > > >> > > > for the last offset. Suppose we return {offset,
> leader
> > > > > epoch}
> > > > > > of
> > > > > > > > the
> > > > > > > > >> > > > message currently in the log. Then, the message gets
> > > > > truncated
> > > > > > > > >> before
> > > > > > > > >> > > > consumer’s first poll(). It does not make sense for
> > > poll()
> > > > > to
> > > > > > > fail
> > > > > > > > >> in
> > > > > > > > >> > > this
> > > > > > > > >> > > > case, because the log truncation did not actually
> > happen
> > > > > from
> > > > > > > the
> > > > > > > > >> > > consumer
> > > > > > > > >> > > > perspective. On the other hand, as the KIP proposes,
> > it
> > > > > makes
> > > > > > > > sense
> > > > > > > > >> for
> > > > > > > > >> > > the
> > > > > > > > >> > > > committed() method to return {offset, leader epoch}
> > > > because
> > > > > > > those
> > > > > > > > >> > offsets
> > > > > > > > >> > > > represent actual consumed messages.
> > > > > > > > >> > > >
> > > > > > > > >> > > >
> > > > > > > > >> > > > The same argument applies to the seek() method — we
> > are
> > > > not
> > > > > > > > seeking
> > > > > > > > >> to
> > > > > > > > >> > a
> > > > > > > > >> > > > message, we are seeking to a position.
> > > > > > > > >> > > >
> > > > > > > > >> > > >
> > > > > > > > >> > > > I like the proposal to add
> KafkaConsumer#findOffsets()
> > > > API.
> > > > > I
> > > > > > am
> > > > > > > > >> > assuming
> > > > > > > > >> > > > something like:
> > > > > > > > >> > > >
> > > > > > > > >> > > > Map<TopicPartition, Long>
> > > findOffsets(Map<TopicPartition,
> > > > > > > > >> > OffsetAndEpoch>
> > > > > > > > >> > > > offsetsToSearch)
> > > > > > > > >> > > >
> > > > > > > > >> > > > Similar to seek() and position(), I think
> > findOffsets()
> > > > > should
> > > > > > > > >> return
> > > > > > > > >> > > > offset without leader epoch, because what we want is
> > the
> > > > > > offset
> > > > > > > > >> that we
> > > > > > > > >> > > > think is closest to the not divergent message from
> the
> > > > given
> > > > > > > > >> consumed
> > > > > > > > >> > > > message. Until the consumer actually fetches the
> > > message,
> > > > we
> > > > > > > > should
> > > > > > > > >> not
> > > > > > > > >> > > let
> > > > > > > > >> > > > the consumer store the leader epoch for a message it
> > did
> > > > not
> > > > > > > > >> consume.
> > > > > > > > >> > > >
> > > > > > > > >> > > >
> > > > > > > > >> > > > So, the workflow will be:
> > > > > > > > >> > > >
> > > > > > > > >> > > > 1) The user gets LogTruncationException with
> {offset,
> > > > leader
> > > > > > > epoch
> > > > > > > > >> of
> > > > > > > > >> > the
> > > > > > > > >> > > > previous message} (whatever we send with new
> > > FetchRecords
> > > > > > > > request).
> > > > > > > > >> > > >
> > > > > > > > >> > > > 2) offset = findOffsets(tp -> {offset, leader
> epoch})
> > > > > > > > >> > > >
> > > > > > > > >> > > > 3) seek(offset)
> > > > > > > > >> > > >
> > > > > > > > >> > > >
> > > > > > > > >> > > > For the use-case where the users store committed
> > offsets
> > > > > > > > externally:
> > > > > > > > >> > > >
> > > > > > > > >> > > > 1) Such users would have to track the leader epoch
> > > > together
> > > > > > with
> > > > > > > > an
> > > > > > > > >> > > offset.
> > > > > > > > >> > > > Otherwise, there is no way to detect later what
> leader
> > > > epoch
> > > > > > was
> > > > > > > > >> > > associated
> > > > > > > > >> > > > with the message. I think it’s reasonable to ask
> that
> > > from
> > > > > > users
> > > > > > > > if
> > > > > > > > >> > they
> > > > > > > > >> > > > want to detect log truncation. Otherwise, they will
> > get
> > > > the
> > > > > > > > current
> > > > > > > > >> > > > behavior.
> > > > > > > > >> > > >
> > > > > > > > >> > > >
> > > > > > > > >> > > > If the users currently get an offset to be stored
> > using
> > > > > > > > position(),
> > > > > > > > >> I
> > > > > > > > >> > see
> > > > > > > > >> > > > two possibilities. First, they call save offset
> > returned
> > > > > from
> > > > > > > > >> > position()
> > > > > > > > >> > > > that they call before poll(). In that case, it would
> > not
> > > > be
> > > > > > > > correct
> > > > > > > > >> to
> > > > > > > > >> > > > store {offset, leader epoch} if we would have
> changed
> > > > > > position()
> > > > > > > > to
> > > > > > > > >> > > return
> > > > > > > > >> > > > {offset, leader epoch} since actual fetched message
> > > could
> > > > be
> > > > > > > > >> different
> > > > > > > > >> > > > (from the example I described earlier). So, it would
> > be
> > > > more
> > > > > > > > >> correct to
> > > > > > > > >> > > > call position() after poll(). However, the user
> > already
> > > > gets
> > > > > > > > >> > > > ConsumerRecords at this point, from which the user
> can
> > > > > extract
> > > > > > > > >> {offset,
> > > > > > > > >> > > > leader epoch} of the last message.
> > > > > > > > >> > > >
> > > > > > > > >> > > >
> > > > > > > > >> > > > So, I like the idea of adding a helper method to
> > > > > > > ConsumerRecords,
> > > > > > > > as
> > > > > > > > >> > > Jason
> > > > > > > > >> > > > proposed, something like:
> > > > > > > > >> > > >
> > > > > > > > >> > > > public OffsetAndEpoch lastOffsetWithLeaderEpoch(),
> > where
> > > > > > > > >> OffsetAndEpoch
> > > > > > > > >> > > is
> > > > > > > > >> > > > a data struct holding {offset, leader epoch}.
> > > > > > > > >> > > >
> > > > > > > > >> > > >
> > > > > > > > >> > > > In this case, we would advise the user to follow the
> > > > > workflow:
> > > > > > > > >> poll(),
> > > > > > > > >> > > get
> > > > > > > > >> > > > {offset, leader epoch} from
> > > ConsumerRecords#lastOffsetWith
> > > > > > > > >> > LeaderEpoch(),
> > > > > > > > >> > > > save offset and leader epoch, process records.
> > > > > > > > >> > > >
> > > > > > > > >> > > >
> > > > > > > > >> > > > 2) When the user needs to seek to the last committed
> > > > offset,
> > > > > > > they
> > > > > > > > >> call
> > > > > > > > >> > > new
> > > > > > > > >> > > > findOffsets(saved offset, leader epoch), and then
> > > > > > seek(offset).
> > > > > > > > >> > > >
> > > > > > > > >> > > >
> > > > > > > > >> > > > What do you think?
> > > > > > > > >> > > >
> > > > > > > > >> > > >
> > > > > > > > >> > > > Thanks,
> > > > > > > > >> > > >
> > > > > > > > >> > > > Anna
> > > > > > > > >> > > >
> > > > > > > > >> > > >
> > > > > > > > >> > > > On Tue, Jul 3, 2018 at 4:06 PM Dong Lin <
> > > > > lindon...@gmail.com>
> > > > > > > > >> wrote:
> > > > > > > > >> > > >
> > > > > > > > >> > > > > Hey Jason,
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > Thanks much for your thoughtful explanation.
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > Yes the solution using findOffsets(offset,
> > > leaderEpoch)
> > > > > also
> > > > > > > > >> works.
> > > > > > > > >> > The
> > > > > > > > >> > > > > advantage of this solution it adds only one API
> > > instead
> > > > of
> > > > > > two
> > > > > > > > >> APIs.
> > > > > > > > >> > > The
> > > > > > > > >> > > > > concern is that its usage seems a bit more clumsy
> > for
> > > > > > advanced
> > > > > > > > >> users.
> > > > > > > > >> > > > More
> > > > > > > > >> > > > > specifically, advanced users who store offsets
> > > > externally
> > > > > > will
> > > > > > > > >> always
> > > > > > > > >> > > > need
> > > > > > > > >> > > > > to call findOffsets() before calling seek(offset)
> > > during
> > > > > > > > consumer
> > > > > > > > >> > > > > initialization. And those advanced users will need
> > to
> > > > > > manually
> > > > > > > > >> keep
> > > > > > > > >> > > track
> > > > > > > > >> > > > > of the leaderEpoch of the last ConsumerRecord.
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > The other solution may be more user-friendly for
> > > > advanced
> > > > > > > users
> > > > > > > > >> is to
> > > > > > > > >> > > add
> > > > > > > > >> > > > > two APIs, `void seek(offset, leaderEpoch)` and
> > > `(offset,
> > > > > > > epoch)
> > > > > > > > =
> > > > > > > > >> > > > > offsetEpochs(topicPartition)`.
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > I kind of prefer the second solution because it is
> > > > easier
> > > > > to
> > > > > > > use
> > > > > > > > >> for
> > > > > > > > >> > > > > advanced users. If we need to expose leaderEpoch
> > > anyway
> > > > to
> > > > > > > > safely
> > > > > > > > >> > > > identify
> > > > > > > > >> > > > > a message, it may be conceptually simpler to
> expose
> > it
> > > > > > > directly
> > > > > > > > in
> > > > > > > > >> > > > > seek(...) rather than requiring one more
> translation
> > > > using
> > > > > > > > >> > > > > findOffsets(...). But I am also OK with the first
> > > > solution
> > > > > > if
> > > > > > > > >> other
> > > > > > > > >> > > > > developers also favor that one :)
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > Thanks,
> > > > > > > > >> > > > > Dong
> > > > > > > > >> > > > >
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > On Mon, Jul 2, 2018 at 11:10 AM, Jason Gustafson <
> > > > > > > > >> ja...@confluent.io
> > > > > > > > >> > >
> > > > > > > > >> > > > > wrote:
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > > Hi Dong,
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > Thanks, I've been thinking about your
> suggestions
> > a
> > > > bit.
> > > > > > It
> > > > > > > is
> > > > > > > > >> > > > > challenging
> > > > > > > > >> > > > > > to make this work given the current APIs. One of
> > the
> > > > > > > > >> difficulties
> > > > > > > > >> > is
> > > > > > > > >> > > > that
> > > > > > > > >> > > > > > we don't have an API to find the leader epoch
> for
> > a
> > > > > given
> > > > > > > > >> offset at
> > > > > > > > >> > > the
> > > > > > > > >> > > > > > moment. So if the user does a seek to offset 5,
> > then
> > > > > we'll
> > > > > > > > need
> > > > > > > > >> a
> > > > > > > > >> > new
> > > > > > > > >> > > > API
> > > > > > > > >> > > > > > to find the corresponding epoch in order to
> > fulfill
> > > > the
> > > > > > new
> > > > > > > > >> > > position()
> > > > > > > > >> > > > > API.
> > > > > > > > >> > > > > > Potentially we could modify ListOffsets to
> enable
> > > > > finding
> > > > > > > the
> > > > > > > > >> > leader
> > > > > > > > >> > > > > epoch,
> > > > > > > > >> > > > > > but I am not sure it is worthwhile. Perhaps it
> is
> > > > > > reasonable
> > > > > > > > for
> > > > > > > > >> > > > advanced
> > > > > > > > >> > > > > > usage to expect that the epoch information, if
> > > needed,
> > > > > > will
> > > > > > > be
> > > > > > > > >> > > > extracted
> > > > > > > > >> > > > > > from the records directly? It might make sense
> to
> > > > > expose a
> > > > > > > > >> helper
> > > > > > > > >> > in
> > > > > > > > >> > > > > > `ConsumerRecords` to make this a little easier
> > > though.
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > Alternatively, if we think it is important to
> have
> > > > this
> > > > > > > > >> information
> > > > > > > > >> > > > > exposed
> > > > > > > > >> > > > > > directly, we could create batch APIs to solve
> the
> > > > naming
> > > > > > > > >> problem.
> > > > > > > > >> > For
> > > > > > > > >> > > > > > example:
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > Map<TopicPartition, OffsetAndEpoch> positions();
> > > > > > > > >> > > > > > void seek(Map<TopicPartition, OffsetAndEpoch>
> > > > > positions);
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > However, I'm actually leaning toward leaving the
> > > > seek()
> > > > > > and
> > > > > > > > >> > > position()
> > > > > > > > >> > > > > APIs
> > > > > > > > >> > > > > > unchanged. Instead, we can add a new API to
> search
> > > for
> > > > > > > offset
> > > > > > > > by
> > > > > > > > >> > > > > timestamp
> > > > > > > > >> > > > > > or by offset/leader epoch. Let's say we call it
> > > > > > > `findOffsets`.
> > > > > > > > >> If
> > > > > > > > >> > the
> > > > > > > > >> > > > > user
> > > > > > > > >> > > > > > hits a log truncation error, they can use this
> API
> > > to
> > > > > find
> > > > > > > the
> > > > > > > > >> > > closest
> > > > > > > > >> > > > > > offset and then do a seek(). At the same time,
> we
> > > > > > deprecate
> > > > > > > > the
> > > > > > > > >> > > > > > `offsetsForTimes` APIs. We now have two use
> cases
> > > > which
> > > > > > > > require
> > > > > > > > >> > > finding
> > > > > > > > >> > > > > > offsets, so I think we should make this API
> > general
> > > > and
> > > > > > > leave
> > > > > > > > >> the
> > > > > > > > >> > > door
> > > > > > > > >> > > > > open
> > > > > > > > >> > > > > > for future extensions.
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > By the way, I'm unclear about the desire to move
> > > part
> > > > of
> > > > > > > this
> > > > > > > > >> > > > > functionality
> > > > > > > > >> > > > > > to AdminClient. Guozhang suggested this
> > previously,
> > > > but
> > > > > I
> > > > > > > > think
> > > > > > > > >> it
> > > > > > > > >> > > only
> > > > > > > > >> > > > > > makes sense for cross-cutting capabilities such
> as
> > > > topic
> > > > > > > > >> creation.
> > > > > > > > >> > If
> > > > > > > > >> > > > we
> > > > > > > > >> > > > > > have an API which is primarily useful by
> > consumers,
> > > > > then I
> > > > > > > > think
> > > > > > > > >> > > that's
> > > > > > > > >> > > > > > where it should be exposed. The AdminClient also
> > has
> > > > its
> > > > > > own
> > > > > > > > API
> > > > > > > > >> > > > > integrity
> > > > > > > > >> > > > > > and should not become a dumping ground for
> > advanced
> > > > use
> > > > > > > cases.
> > > > > > > > >> I'll
> > > > > > > > >> > > > > update
> > > > > > > > >> > > > > > the KIP with the  `findOffsets` API suggested
> > above
> > > > and
> > > > > we
> > > > > > > can
> > > > > > > > >> see
> > > > > > > > >> > if
> > > > > > > > >> > > > it
> > > > > > > > >> > > > > > does a good enough job of keeping the API simple
> > for
> > > > > > common
> > > > > > > > >> cases.
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > Thanks,
> > > > > > > > >> > > > > > Jason
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > On Sat, Jun 30, 2018 at 4:39 AM, Dong Lin <
> > > > > > > > lindon...@gmail.com>
> > > > > > > > >> > > wrote:
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > > Hey Jason,
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > > > Regarding seek(...), it seems that we want an
> > API
> > > > for
> > > > > > user
> > > > > > > > to
> > > > > > > > >> > > > > initialize
> > > > > > > > >> > > > > > > consumer with (offset, leaderEpoch) and that
> API
> > > > > should
> > > > > > > > allow
> > > > > > > > >> > > > throwing
> > > > > > > > >> > > > > > > PartitionTruncationException. Suppose we agree
> > on
> > > > > this,
> > > > > > > then
> > > > > > > > >> > > > > > > seekToNearest() is not sufficient because it
> > will
> > > > > always
> > > > > > > > >> swallow
> > > > > > > > >> > > > > > > PartitionTruncationException. Here we have two
> > > > > options.
> > > > > > > The
> > > > > > > > >> first
> > > > > > > > >> > > > > option
> > > > > > > > >> > > > > > is
> > > > > > > > >> > > > > > > to add API offsetsForLeaderEpochs() to
> translate
> > > > > > > > (leaderEpoch,
> > > > > > > > >> > > > offset)
> > > > > > > > >> > > > > to
> > > > > > > > >> > > > > > > offset. The second option is to have add
> > > > seek(offset,
> > > > > > > > >> > leaderEpoch).
> > > > > > > > >> > > > It
> > > > > > > > >> > > > > > > seems that second option may be more simpler
> > > because
> > > > > it
> > > > > > > > makes
> > > > > > > > >> it
> > > > > > > > >> > > > clear
> > > > > > > > >> > > > > > that
> > > > > > > > >> > > > > > > (offset, leaderEpoch) will be used to identify
> > > > > > consumer's
> > > > > > > > >> > position
> > > > > > > > >> > > > in a
> > > > > > > > >> > > > > > > partition. And user only needs to handle
> > > > > > > > >> > > PartitionTruncationException
> > > > > > > > >> > > > > > from
> > > > > > > > >> > > > > > > the poll(). In comparison the first option
> > seems a
> > > > bit
> > > > > > > > harder
> > > > > > > > >> to
> > > > > > > > >> > > use
> > > > > > > > >> > > > > > > because user have to also handle the
> > > > > > > > >> PartitionTruncationException
> > > > > > > > >> > > if
> > > > > > > > >> > > > > > > offsetsForLeaderEpochs() returns different
> > offset
> > > > from
> > > > > > > > >> > > user-provided
> > > > > > > > >> > > > > > > offset. What do you think?
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > > > If we decide to add API seek(offset,
> > leaderEpoch),
> > > > > then
> > > > > > we
> > > > > > > > can
> > > > > > > > >> > > decide
> > > > > > > > >> > > > > > > whether and how to add API to translate
> (offset,
> > > > > > > > leaderEpoch)
> > > > > > > > >> to
> > > > > > > > >> > > > > offset.
> > > > > > > > >> > > > > > It
> > > > > > > > >> > > > > > > seems that this API will be needed by advanced
> > > user
> > > > to
> > > > > > > don't
> > > > > > > > >> want
> > > > > > > > >> > > > auto
> > > > > > > > >> > > > > > > offset reset (so that it can be notified) but
> > > still
> > > > > > wants
> > > > > > > to
> > > > > > > > >> > reset
> > > > > > > > >> > > > > offset
> > > > > > > > >> > > > > > > to closest. For those users if probably makes
> > > sense
> > > > to
> > > > > > > only
> > > > > > > > >> have
> > > > > > > > >> > > the
> > > > > > > > >> > > > > API
> > > > > > > > >> > > > > > in
> > > > > > > > >> > > > > > > AdminClient. offsetsForTimes() seems like a
> > common
> > > > API
> > > > > > > that
> > > > > > > > >> will
> > > > > > > > >> > be
> > > > > > > > >> > > > > > needed
> > > > > > > > >> > > > > > > by user's of consumer in general, so it may be
> > > more
> > > > > > > > >> reasonable to
> > > > > > > > >> > > > stay
> > > > > > > > >> > > > > in
> > > > > > > > >> > > > > > > the consumer API. I don't have a strong
> opinion
> > on
> > > > > > whether
> > > > > > > > >> > > > > > > offsetsForTimes() should be replaced by API in
> > > > > > > AdminClient.
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > > > Though (offset, leaderEpoch) is needed to
> > uniquely
> > > > > > > identify
> > > > > > > > a
> > > > > > > > >> > > message
> > > > > > > > >> > > > > in
> > > > > > > > >> > > > > > > general, it is only needed for advanced users
> > who
> > > > has
> > > > > > > turned
> > > > > > > > >> on
> > > > > > > > >> > > > unclean
> > > > > > > > >> > > > > > > leader election, need to use seek(..), and
> don't
> > > > want
> > > > > > auto
> > > > > > > > >> offset
> > > > > > > > >> > > > > reset.
> > > > > > > > >> > > > > > > Most other users probably just want to enable
> > auto
> > > > > > offset
> > > > > > > > >> reset
> > > > > > > > >> > and
> > > > > > > > >> > > > > store
> > > > > > > > >> > > > > > > offset in Kafka. Thus we might want to keep
> the
> > > > > existing
> > > > > > > > >> > > offset-only
> > > > > > > > >> > > > > APIs
> > > > > > > > >> > > > > > > (e.g. seek() and position()) for most users
> > while
> > > > > adding
> > > > > > > new
> > > > > > > > >> APIs
> > > > > > > > >> > > for
> > > > > > > > >> > > > > > > advanced users. And yes, it seems that we need
> > new
> > > > > name
> > > > > > > for
> > > > > > > > >> > > > position().
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > > > Though I think we need new APIs to carry the
> new
> > > > > > > information
> > > > > > > > >> > (e.g.
> > > > > > > > >> > > > > > > leaderEpoch), I am not very sure how that
> should
> > > > look
> > > > > > > like.
> > > > > > > > >> One
> > > > > > > > >> > > > > possible
> > > > > > > > >> > > > > > > option is those APIs in KIP-232. Another
> option
> > is
> > > > > > > something
> > > > > > > > >> like
> > > > > > > > >> > > > this:
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > > > `````
> > > > > > > > >> > > > > > > class OffsetEpochs {
> > > > > > > > >> > > > > > >   long offset;
> > > > > > > > >> > > > > > >   int leaderEpoch;
> > > > > > > > >> > > > > > >   int partitionEpoch;   // This may be needed
> > > later
> > > > as
> > > > > > > > >> discussed
> > > > > > > > >> > in
> > > > > > > > >> > > > > > KIP-232
> > > > > > > > >> > > > > > >   ... // Hopefully these are all we need to
> > > identify
> > > > > > > message
> > > > > > > > >> in
> > > > > > > > >> > > > Kafka.
> > > > > > > > >> > > > > > But
> > > > > > > > >> > > > > > > if we need more then we can add new fields in
> > this
> > > > > > class.
> > > > > > > > >> > > > > > > }
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > > > OffsetEpochs offsetEpochs(TopicPartition);
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > > > void seek(TopicPartition, OffsetEpochs);
> > > > > > > > >> > > > > > > ``````
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > > > Thanks,
> > > > > > > > >> > > > > > > Dong
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > > > On Fri, Jun 29, 2018 at 11:13 AM, Jason
> > Gustafson
> > > <
> > > > > > > > >> > > > ja...@confluent.io>
> > > > > > > > >> > > > > > > wrote:
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > > > > Hey Dong,
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > > Thanks for the feedback. The first three
> > points
> > > > are
> > > > > > > easy:
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > > 1. Yes, we should be consistent.
> > > > > > > > >> > > > > > > > 2. Yes, I will add this.
> > > > > > > > >> > > > > > > > 3. Yes, I think we should document the
> changes
> > > to
> > > > > the
> > > > > > > > >> committed
> > > > > > > > >> > > > > offset
> > > > > > > > >> > > > > > > > schema. I meant to do this, but it slipped
> my
> > > > mind.
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > > The latter questions are tougher. One
> option I
> > > was
> > > > > > > > >> considering
> > > > > > > > >> > is
> > > > > > > > >> > > > to
> > > > > > > > >> > > > > > have
> > > > > > > > >> > > > > > > > only `offsetsForLeaderEpochs` exposed from
> the
> > > > > > consumer
> > > > > > > > and
> > > > > > > > >> to
> > > > > > > > >> > > drop
> > > > > > > > >> > > > > the
> > > > > > > > >> > > > > > > new
> > > > > > > > >> > > > > > > > seek() API. That seems more consistent with
> > the
> > > > > > current
> > > > > > > > use
> > > > > > > > >> of
> > > > > > > > >> > > > > > > > `offsetsForTimes` (we don't have a separate
> > > > > > > > >> `seekToTimestamp`
> > > > > > > > >> > > API).
> > > > > > > > >> > > > > An
> > > > > > > > >> > > > > > > > alternative might be to take a page from the
> > > > > > AdminClient
> > > > > > > > API
> > > > > > > > >> > and
> > > > > > > > >> > > > add
> > > > > > > > >> > > > > a
> > > > > > > > >> > > > > > > new
> > > > > > > > >> > > > > > > > method to generalize offset lookup. For
> > example,
> > > > we
> > > > > > > could
> > > > > > > > >> have
> > > > > > > > >> > > > > > > > `lookupOffsets(LookupOptions)`. We could
> then
> > > > > > deprecate
> > > > > > > > >> > > > > > > `offsetsForTimes`
> > > > > > > > >> > > > > > > > and this would open the door for future
> > > extensions
> > > > > > > without
> > > > > > > > >> > > needing
> > > > > > > > >> > > > > new
> > > > > > > > >> > > > > > > > APIs.
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > > The case of position() is a little more
> > > annoying.
> > > > It
> > > > > > > would
> > > > > > > > >> have
> > > > > > > > >> > > > been
> > > > > > > > >> > > > > > > better
> > > > > > > > >> > > > > > > > had we let this return an object so that it
> is
> > > > > easier
> > > > > > to
> > > > > > > > >> > extend.
> > > > > > > > >> > > > This
> > > > > > > > >> > > > > > is
> > > > > > > > >> > > > > > > > the only reason I didn't add the API to the
> > KIP.
> > > > > Maybe
> > > > > > > we
> > > > > > > > >> > should
> > > > > > > > >> > > > bite
> > > > > > > > >> > > > > > the
> > > > > > > > >> > > > > > > > bullet and fix this now? Unfortunately we'll
> > > have
> > > > to
> > > > > > > come
> > > > > > > > up
> > > > > > > > >> > > with a
> > > > > > > > >> > > > > new
> > > > > > > > >> > > > > > > > name. Maybe `currentPosition`?
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > > Thoughts?
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > > -Jason
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > > On Fri, Jun 29, 2018 at 10:40 AM, Dong Lin <
> > > > > > > > >> > lindon...@gmail.com>
> > > > > > > > >> > > > > > wrote:
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > > > Regarding points 4) and 5) above,
> motivation
> > > for
> > > > > the
> > > > > > > > >> > > alternative
> > > > > > > > >> > > > > APIs
> > > > > > > > >> > > > > > > is
> > > > > > > > >> > > > > > > > > that, if we decide that leaderEpoch is
> > equally
> > > > > > > important
> > > > > > > > >> as
> > > > > > > > >> > > > offset
> > > > > > > > >> > > > > in
> > > > > > > > >> > > > > > > > > identifying a message, then it may be
> > > reasonable
> > > > > to
> > > > > > > > always
> > > > > > > > >> > > > specify
> > > > > > > > >> > > > > it
> > > > > > > > >> > > > > > > > > wherever offset is currently required in
> the
> > > > > > consumer
> > > > > > > > API
> > > > > > > > >> to
> > > > > > > > >> > > > > > identify a
> > > > > > > > >> > > > > > > > > message, e.g. position(), seek(). For
> > example,
> > > > > since
> > > > > > > we
> > > > > > > > >> allow
> > > > > > > > >> > > > user
> > > > > > > > >> > > > > to
> > > > > > > > >> > > > > > > > > retrieve offset using position() instead
> of
> > > > asking
> > > > > > > user
> > > > > > > > to
> > > > > > > > >> > keep
> > > > > > > > >> > > > > track
> > > > > > > > >> > > > > > > of
> > > > > > > > >> > > > > > > > > the offset of the latest ConsumerRecord,
> may
> > > be
> > > > it
> > > > > > > will
> > > > > > > > be
> > > > > > > > >> > more
> > > > > > > > >> > > > > > > > consistent
> > > > > > > > >> > > > > > > > > for user to also retrieve  leaderEpoch
> using
> > > > > > > position()?
> > > > > > > > >> > > > > > > > >
> > > > > > > > >> > > > > > > > >
> > > > > > > > >> > > > > > > > >
> > > > > > > > >> > > > > > > > > On Fri, Jun 29, 2018 at 10:30 AM, Dong
> Lin <
> > > > > > > > >> > > lindon...@gmail.com>
> > > > > > > > >> > > > > > > wrote:
> > > > > > > > >> > > > > > > > >
> > > > > > > > >> > > > > > > > > > Hey Jason,
> > > > > > > > >> > > > > > > > > >
> > > > > > > > >> > > > > > > > > > Thanks for the update. It looks pretty
> > good.
> > > > > Just
> > > > > > > some
> > > > > > > > >> > minor
> > > > > > > > >> > > > > > comments
> > > > > > > > >> > > > > > > > > > below:
> > > > > > > > >> > > > > > > > > >
> > > > > > > > >> > > > > > > > > > 1) The KIP adds new error code
> > > > "LOG_TRUNCATION"
> > > > > > and
> > > > > > > > new
> > > > > > > > >> > > > exception
> > > > > > > > >> > > > > > > > > TruncatedPartitionException.
> > > > > > > > >> > > > > > > > > > Can we make the name more consistent,
> e.g.
> > > > > > > > >> > > > > LogTruncationException?
> > > > > > > > >> > > > > > > > > >
> > > > > > > > >> > > > > > > > > > 2) Do we need to add
> > > > UnknownLeaderEpochException
> > > > > > as
> > > > > > > > >> part of
> > > > > > > > >> > > API
> > > > > > > > >> > > > > > > change?
> > > > > > > > >> > > > > > > > > >
> > > > > > > > >> > > > > > > > > > 3) Not sure if the offset topic schema
> is
> > > also
> > > > > > > public
> > > > > > > > >> API.
> > > > > > > > >> > If
> > > > > > > > >> > > > so,
> > > > > > > > >> > > > > > > maybe
> > > > > > > > >> > > > > > > > > we
> > > > > > > > >> > > > > > > > > > should also include the schema change in
> > the
> > > > > API?
> > > > > > > > >> > > > > > > > > >
> > > > > > > > >> > > > > > > > > > 4) For users who store offset
> externally,
> > > > > > currently
> > > > > > > > they
> > > > > > > > >> > get
> > > > > > > > >> > > > > offset
> > > > > > > > >> > > > > > > > using
> > > > > > > > >> > > > > > > > > > position(..), store the offset
> externally,
> > > and
> > > > > use
> > > > > > > > >> seek(..)
> > > > > > > > >> > > to
> > > > > > > > >> > > > > > > > initialize
> > > > > > > > >> > > > > > > > > > the consumer next time. After this KIP
> > they
> > > > will
> > > > > > > need
> > > > > > > > to
> > > > > > > > >> > > store
> > > > > > > > >> > > > > and
> > > > > > > > >> > > > > > > use
> > > > > > > > >> > > > > > > > > the
> > > > > > > > >> > > > > > > > > > leaderEpoch together with the offset.
> > Should
> > > > we
> > > > > > also
> > > > > > > > >> update
> > > > > > > > >> > > the
> > > > > > > > >> > > > > API
> > > > > > > > >> > > > > > > so
> > > > > > > > >> > > > > > > > > that
> > > > > > > > >> > > > > > > > > > user can also get leaderEpoch from
> > > > > position(...)?
> > > > > > > Not
> > > > > > > > >> sure
> > > > > > > > >> > if
> > > > > > > > >> > > > it
> > > > > > > > >> > > > > is
> > > > > > > > >> > > > > > > OK
> > > > > > > > >> > > > > > > > to
> > > > > > > > >> > > > > > > > > > ask user to track the latest leaderEpoch
> > of
> > > > > > > > >> ConsumerRecord
> > > > > > > > >> > by
> > > > > > > > >> > > > > > > > themselves.
> > > > > > > > >> > > > > > > > > >
> > > > > > > > >> > > > > > > > > > 5) Also for users who store offset
> > > externally,
> > > > > > they
> > > > > > > > >> need to
> > > > > > > > >> > > > call
> > > > > > > > >> > > > > > > > seek(..)
> > > > > > > > >> > > > > > > > > > with leaderEpoch to initialize consumer.
> > > With
> > > > > > > current
> > > > > > > > >> KIP
> > > > > > > > >> > > users
> > > > > > > > >> > > > > > need
> > > > > > > > >> > > > > > > to
> > > > > > > > >> > > > > > > > > > call seekToNearest(), whose name
> suggests
> > > that
> > > > > the
> > > > > > > > final
> > > > > > > > >> > > > position
> > > > > > > > >> > > > > > may
> > > > > > > > >> > > > > > > > be
> > > > > > > > >> > > > > > > > > > different from what was requested.
> > However,
> > > if
> > > > > > users
> > > > > > > > may
> > > > > > > > >> > want
> > > > > > > > >> > > > to
> > > > > > > > >> > > > > > > avoid
> > > > > > > > >> > > > > > > > > auto
> > > > > > > > >> > > > > > > > > > offset reset and be notified explicitly
> > when
> > > > > there
> > > > > > > is
> > > > > > > > >> log
> > > > > > > > >> > > > > > truncation,
> > > > > > > > >> > > > > > > > > then seekToNearest()
> > > > > > > > >> > > > > > > > > > probably does not help here. Would it
> make
> > > > sense
> > > > > > to
> > > > > > > > >> replace
> > > > > > > > >> > > > > > > > > seekToNearest()
> > > > > > > > >> > > > > > > > > > with seek(offset, leaderEpoch) +
> > AminClient.
> > > > > > > > >> > > > > > > > offsetsForLeaderEpochs(...)?
> > > > > > > > >> > > > > > > > > >
> > > > > > > > >> > > > > > > > > >
> > > > > > > > >> > > > > > > > > > Thanks,
> > > > > > > > >> > > > > > > > > > Dong
> > > > > > > > >> > > > > > > > > >
> > > > > > > > >> > > > > > > > > >
> > > > > > > > >> > > > > > > > > > On Wed, Jun 27, 2018 at 3:57 PM, Jason
> > > > > Gustafson <
> > > > > > > > >> > > > > > ja...@confluent.io
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > > > > wrote:
> > > > > > > > >> > > > > > > > > >
> > > > > > > > >> > > > > > > > > >> Hey Guozhang,
> > > > > > > > >> > > > > > > > > >>
> > > > > > > > >> > > > > > > > > >> That's fair. In fact, perhaps we do not
> > > need
> > > > > this
> > > > > > > API
> > > > > > > > >> at
> > > > > > > > >> > > all.
> > > > > > > > >> > > > We
> > > > > > > > >> > > > > > > > already
> > > > > > > > >> > > > > > > > > >> have the new seek() in this KIP which
> can
> > > do
> > > > > the
> > > > > > > > lookup
> > > > > > > > >> > > based
> > > > > > > > >> > > > on
> > > > > > > > >> > > > > > > epoch
> > > > > > > > >> > > > > > > > > for
> > > > > > > > >> > > > > > > > > >> this use case. I guess we should
> probably
> > > > call
> > > > > it
> > > > > > > > >> > > > > seekToNearest()
> > > > > > > > >> > > > > > > > though
> > > > > > > > >> > > > > > > > > >> to
> > > > > > > > >> > > > > > > > > >> make it clear that the final position
> may
> > > be
> > > > > > > > different
> > > > > > > > >> > from
> > > > > > > > >> > > > what
> > > > > > > > >> > > > > > was
> > > > > > > > >> > > > > > > > > >> requested.
> > > > > > > > >> > > > > > > > > >>
> > > > > > > > >> > > > > > > > > >> Thanks,
> > > > > > > > >> > > > > > > > > >> Jason
> > > > > > > > >> > > > > > > > > >>
> > > > > > > > >> > > > > > > > > >> On Wed, Jun 27, 2018 at 3:20 PM,
> Guozhang
> > > > Wang
> > > > > <
> > > > > > > > >> > > > > > wangg...@gmail.com>
> > > > > > > > >> > > > > > > > > >> wrote:
> > > > > > > > >> > > > > > > > > >>
> > > > > > > > >> > > > > > > > > >> > Hi Jason,
> > > > > > > > >> > > > > > > > > >> >
> > > > > > > > >> > > > > > > > > >> > I think it is less worthwhile to add
> > > > > > > > >> > > > > > > KafkaConsumer#offsetsForLeader
> > > > > > > > >> > > > > > > > > >> Epochs,
> > > > > > > > >> > > > > > > > > >> > since probably only very advanced
> users
> > > are
> > > > > > aware
> > > > > > > > of
> > > > > > > > >> the
> > > > > > > > >> > > > > > > > leaderEpoch,
> > > > > > > > >> > > > > > > > > >> and
> > > > > > > > >> > > > > > > > > >> > hence ever care to use it anyways. It
> > is
> > > > more
> > > > > > > like
> > > > > > > > an
> > > > > > > > >> > > admin
> > > > > > > > >> > > > > > client
> > > > > > > > >> > > > > > > > > >> > operation than a consumer client
> > > operation:
> > > > > if
> > > > > > > the
> > > > > > > > >> > > > motivation
> > > > > > > > >> > > > > is
> > > > > > > > >> > > > > > > to
> > > > > > > > >> > > > > > > > > >> > facility customized reset policy,
> maybe
> > > > > adding
> > > > > > it
> > > > > > > > as
> > > > > > > > >> > > > > > > > > >> > AdminClient#offsetsForLeaderEpochs
> > > > > > > > >> > > > > > > > > >> > is better as it is not an aggressive
> > > > > assumption
> > > > > > > > that
> > > > > > > > >> for
> > > > > > > > >> > > > such
> > > > > > > > >> > > > > > > > advanced
> > > > > > > > >> > > > > > > > > >> > users they are willing to use some
> > admin
> > > > > client
> > > > > > > to
> > > > > > > > >> get
> > > > > > > > >> > > > further
> > > > > > > > >> > > > > > > > > >> information?
> > > > > > > > >> > > > > > > > > >> >
> > > > > > > > >> > > > > > > > > >> >
> > > > > > > > >> > > > > > > > > >> > Guozhang
> > > > > > > > >> > > > > > > > > >> >
> > > > > > > > >> > > > > > > > > >> >
> > > > > > > > >> > > > > > > > > >> > On Wed, Jun 27, 2018 at 2:07 PM,
> Jason
> > > > > > Gustafson
> > > > > > > <
> > > > > > > > >> > > > > > > > ja...@confluent.io>
> > > > > > > > >> > > > > > > > > >> > wrote:
> > > > > > > > >> > > > > > > > > >> >
> > > > > > > > >> > > > > > > > > >> > > Thanks for the feedback. I've
> updated
> > > the
> > > > > > KIP.
> > > > > > > > >> > > > Specifically
> > > > > > > > >> > > > > I
> > > > > > > > >> > > > > > > > > removed
> > > > > > > > >> > > > > > > > > >> the
> > > > > > > > >> > > > > > > > > >> > > "closest" reset option and the
> > proposal
> > > > to
> > > > > > > reset
> > > > > > > > by
> > > > > > > > >> > > > > timestamp
> > > > > > > > >> > > > > > > when
> > > > > > > > >> > > > > > > > > the
> > > > > > > > >> > > > > > > > > >> > > precise truncation point cannot be
> > > > > > determined.
> > > > > > > > >> > Instead,
> > > > > > > > >> > > I
> > > > > > > > >> > > > > > > proposed
> > > > > > > > >> > > > > > > > > >> that
> > > > > > > > >> > > > > > > > > >> > we
> > > > > > > > >> > > > > > > > > >> > > always reset using the nearest
> epoch
> > > > when a
> > > > > > > reset
> > > > > > > > >> > policy
> > > > > > > > >> > > > is
> > > > > > > > >> > > > > > > > defined
> > > > > > > > >> > > > > > > > > >> > (either
> > > > > > > > >> > > > > > > > > >> > > "earliest" or "latest"). Does that
> > > sound
> > > > > > > > >> reasonable?
> > > > > > > > >> > > > > > > > > >> > >
> > > > > > > > >> > > > > > > > > >> > > One thing I am still debating is
> > > whether
> > > > it
> > > > > > > would
> > > > > > > > >> be
> > > > > > > > >> > > > better
> > > > > > > > >> > > > > to
> > > > > > > > >> > > > > > > > have
> > > > > > > > >> > > > > > > > > a
> > > > > > > > >> > > > > > > > > >> > > separate API to find the closest
> > offset
> > > > > using
> > > > > > > the
> > > > > > > > >> > leader
> > > > > > > > >> > > > > > epoch.
> > > > > > > > >> > > > > > > In
> > > > > > > > >> > > > > > > > > the
> > > > > > > > >> > > > > > > > > >> > > current KIP, I suggested to
> piggyback
> > > > this
> > > > > > > > >> information
> > > > > > > > >> > > on
> > > > > > > > >> > > > an
> > > > > > > > >> > > > > > > > > >> exception,
> > > > > > > > >> > > > > > > > > >> > but
> > > > > > > > >> > > > > > > > > >> > > I'm beginning to think it would be
> > > better
> > > > > not
> > > > > > > to
> > > > > > > > >> hide
> > > > > > > > >> > > the
> > > > > > > > >> > > > > > > lookup.
> > > > > > > > >> > > > > > > > It
> > > > > > > > >> > > > > > > > > >> is
> > > > > > > > >> > > > > > > > > >> > > awkward to implement since it means
> > > > > delaying
> > > > > > > the
> > > > > > > > >> > > exception
> > > > > > > > >> > > > > and
> > > > > > > > >> > > > > > > the
> > > > > > > > >> > > > > > > > > API
> > > > > > > > >> > > > > > > > > >> > may
> > > > > > > > >> > > > > > > > > >> > > actually be useful when customizing
> > > reset
> > > > > > logic
> > > > > > > > if
> > > > > > > > >> no
> > > > > > > > >> > > auto
> > > > > > > > >> > > > > > reset
> > > > > > > > >> > > > > > > > > >> policy
> > > > > > > > >> > > > > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to