Hi Jason,

I also like your proposal and agree that KafkaConsumer#seekToCommitted() is
more intuitive as a way to initialize both consumer's position and its
fetch state.


My understanding that KafkaConsumer#seekToCommitted() is purely for clients
who store their offsets externally, right? And we are still going to
add KafkaConsumer#findOffsets()
in this KIP as we discussed, so that the client can handle
LogTruncationException?


Thanks,

Anna


On Thu, Jul 12, 2018 at 3:57 PM Dong Lin <lindon...@gmail.com> wrote:

> Hey Jason,
>
> It is a great summary. The solution sounds good. I might have minor
> comments regarding the method name. But we can discuss that minor points
> later after we reach consensus on the high level API.
>
> Thanks,
> Dong
>
>
> On Thu, Jul 12, 2018 at 11:41 AM, Jason Gustafson <ja...@confluent.io>
> wrote:
>
> > Hey Anna and Dong,
> >
> > Thanks a lot for the great discussion. I've been hanging back a bit
> because
> > honestly the best option hasn't seemed clear. I agree with Anna's general
> > observation that there is a distinction between the position of the
> > consumer and its fetch state up to that position. If you think about it,
> a
> > committed offset actually represents both of these. The metadata is used
> to
> > initialize the state of the consumer application and the offset
> initializes
> > the position. Additionally, we are extending the offset commit in this
> KIP
> > to also include the last epoch fetched by the consumer, which is used to
> > initialize the internal fetch state. Of course if you do an arbitrary
> > `seek` and immediately commit offsets, then there won't be a last epoch
> to
> > commit. This seems intuitive since there is no fetch state in this case.
> We
> > only commit fetch state when we have it.
> >
> > So if we think about a committed offset as initializing both the
> consumer's
> > position and its fetch state, then the gap in the API is evidently that
> we
> > don't have a way to initialize the consumer to a committed offset. We do
> it
> > implicitly of course for offsets stored in Kafka, but since external
> > storage is a use case we support, then we should have an explicit API as
> > well. Perhaps something like this:
> >
> > seekToCommitted(TopicPartition, OffsetAndMetadata)
> >
> > In this KIP, we are proposing to allow the `OffsetAndMetadata` object to
> > include the leader epoch, so I think this would have the same effect as
> > Anna's suggested `seekToRecord`. But perhaps it is a more natural fit
> given
> > the current API? Furthermore, if we find a need for additional metadata
> in
> > the offset commit API in the future, then we will just need to modify the
> > `OffsetAndMetadata` object and we will not need a new `seek` API.
> >
> > With this approach, I think then we can leave the `position` API as it
> is.
> > The position of the consumer is still just the next expected fetch
> offset.
> > If a user needs to record additional state based on previous fetch
> > progress, then they would use the result of the previous fetch to obtain
> > it. This makes the dependence on fetch progress explicit. I think we
> could
> > make this a little more convenience with a helper in the
> `ConsumerRecords`
> > object, but I think that's more of a nice-to-have.
> >
> > Thoughts?
> >
> > By the way, I have been iterating a little bit on the replica side of
> this
> > KIP. My initial proposal in fact did not have strong enough fencing to
> > protect all of the edge cases. I believe the current proposal fixes the
> > problems, but I am still verifying the model.
> >
> > Thanks,
> > Jason
> >
> >
> > On Wed, Jul 11, 2018 at 10:45 AM, Dong Lin <lindon...@gmail.com> wrote:
> >
> > > Hey Anna,
> > >
> > > Thanks much for the explanation. Approach 1 also sounds good to me. I
> > think
> > > findOffsets() is useful for users who don't use automatic offset reset
> > > policy.
> > >
> > > Just one more question. Since users who store offsets externally need
> to
> > > provide leaderEpoch to findOffsets(...), do we need an extra API for
> user
> > > to get both offset and leaderEpoch, e.g. recordPosition()?
> > >
> > > Thanks,
> > > Dong
> > >
> > > On Wed, Jul 11, 2018 at 10:12 AM, Anna Povzner <a...@confluent.io>
> > wrote:
> > >
> > > > Hi Dong,
> > > >
> > > >
> > > > What I called “not covering all use cases” is what you call
> best-effort
> > > > (not guaranteeing some corner cases). I think we are on the same page
> > > here.
> > > >
> > > >
> > > > I wanted to be clear in the API whether the consumer seeks to a
> > position
> > > > (offset) or to a record (offset, leader epoch). The only use-case of
> > > > seeking to a record is seeking to a committed offset for a user who
> > > stores
> > > > committed offsets externally. (Unless users find some other reason to
> > > seek
> > > > to a record.) I thought it was possible to provide this functionality
> > > with
> > > > findOffset(offset, leader epoch) followed by a seek(offset). However,
> > you
> > > > are right that this will not handle the race condition where
> > > non-divergent
> > > > offset found by findOffset() could change again before the consumer
> > does
> > > > the first fetch.
> > > >
> > > >
> > > > Regarding position() — if we add position that returns (offset,
> leader
> > > > epoch), this is specifically a position after a record that was
> > actually
> > > > consumed or position of a committed record. In which case, I still
> > think
> > > > it’s cleaner to get a record position of consumed message from a new
> > > helper
> > > > method in ConsumerRecords() or from committed offsets.
> > > >
> > > >
> > > > I think all the use-cases could be then covered with:
> > > >
> > > > (Approach 1)
> > > >
> > > > seekToRecord(offset, leaderEpoch) — this will just initialize/set the
> > > > consumer state;
> > > >
> > > > findOffsets(offset, leaderEpoch) returns {offset, leaderEpoch}
> > > >
> > > >
> > > > If we agree that the race condition is also a corner case, then I
> think
> > > we
> > > > can cover use-cases with:
> > > >
> > > > (Approach 2)
> > > >
> > > > findOffsets(offset, leaderEpoch) returns offset — we still want
> leader
> > > > epoch as a parameter for the users who store their committed offsets
> > > > externally.
> > > >
> > > >
> > > > I am actually now leaning more to approach 1, since it is more
> > explicit,
> > > > and maybe there are more use cases for it.
> > > >
> > > >
> > > > Thanks,
> > > >
> > > > Anna
> > > >
> > > >
> > > > On Tue, Jul 10, 2018 at 3:47 PM Dong Lin <lindon...@gmail.com>
> wrote:
> > > >
> > > > > Hey Anna,
> > > > >
> > > > > Thanks for the comment. To answer your question, it seems that we
> can
> > > > cover
> > > > > all case in this KIP. As stated in "Consumer Handling" section,
> > KIP-101
> > > > > based approach will be used to derive the truncation offset from
> the
> > > > > 2-tuple (offset, leaderEpoch). This approach is best effort and it
> is
> > > > > inaccurate only in very rare scenarios (as described in KIP-279).
> > > > >
> > > > > By using seek(offset, leaderEpoch), consumer will still be able to
> > > follow
> > > > > this best-effort approach to detect log truncation and determine
> the
> > > > > truncation offset. On the other hand, if we use seek(offset),
> > consumer
> > > > will
> > > > > not detect log truncation in some cases which weakens the guarantee
> > of
> > > > this
> > > > > KIP. Does this make sense?
> > > > >
> > > > > Thanks,
> > > > > Dong
> > > > >
> > > > > On Tue, Jul 10, 2018 at 3:14 PM, Anna Povzner <a...@confluent.io>
> > > wrote:
> > > > >
> > > > > > Sorry, I hit "send" before finishing. Continuing...
> > > > > >
> > > > > >
> > > > > > 2) Hiding most of the consumer handling log truncation logic with
> > > > minimal
> > > > > > exposure in KafkaConsumer API.  I was proposing this path.
> > > > > >
> > > > > >
> > > > > > Before answering your specific questions… I want to answer to
> your
> > > > > comment
> > > > > > “In general, maybe we should discuss the final solution that
> covers
> > > all
> > > > > > cases?”. With current KIP, we don’t cover all cases of consumer
> > > > detecting
> > > > > > log truncation because the KIP proposes a leader epoch cache in
> > > > consumer
> > > > > > that does not persist across restarts. Plus, we only store last
> > > > committed
> > > > > > offset (either internally or users can store externally). This
> has
> > a
> > > > > > limitation that the consumer will not always be able to find
> point
> > of
> > > > > > truncation just because we have a limited history (just one data
> > > > point).
> > > > > >
> > > > > >
> > > > > > So, maybe we should first agree on whether we accept that storing
> > > last
> > > > > > committed offset/leader epoch has a limitation that the consumer
> > will
> > > > not
> > > > > > be able to detect log truncation in all cases?
> > > > > >
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Anna
> > > > > >
> > > > > > On Tue, Jul 10, 2018 at 2:20 PM Anna Povzner <a...@confluent.io>
> > > > wrote:
> > > > > >
> > > > > > > Hi Dong,
> > > > > > >
> > > > > > > Thanks for the follow up! I finally have much more clear
> > > > understanding
> > > > > of
> > > > > > > where you are coming from.
> > > > > > >
> > > > > > > You are right. The success of findOffsets()/finding a point of
> > > > > > > non-divergence depends on whether we have enough entries in the
> > > > > > consumer's
> > > > > > > leader epoch cache. However, I think this is a fundamental
> > > limitation
> > > > > of
> > > > > > > having a leader epoch cache that does not persist across
> consumer
> > > > > > restarts.
> > > > > > >
> > > > > > > If we consider the general case where consumer may or may not
> > have
> > > > this
> > > > > > > cache, then I see two paths:
> > > > > > > 1) Letting the user to track the leader epoch history
> externally,
> > > and
> > > > > > have
> > > > > > > more exposure to leader epoch and finding point of
> non-divergence
> > > in
> > > > > > > KafkaConsumer API. I understand this is the case you were
> talking
> > > > > about.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Tue, Jul 10, 2018 at 12:16 PM Dong Lin <lindon...@gmail.com
> >
> > > > wrote:
> > > > > > >
> > > > > > >> Hey Anna,
> > > > > > >>
> > > > > > >> Thanks much for your detailed explanation and example! It does
> > > help
> > > > me
> > > > > > >> understand the difference between our understanding.
> > > > > > >>
> > > > > > >> So it seems that the solution based on findOffsets() currently
> > > > focuses
> > > > > > >> mainly on the scenario that consumer has cached leaderEpoch ->
> > > > offset
> > > > > > >> mapping whereas I was thinking about the general case where
> > > consumer
> > > > > may
> > > > > > >> or
> > > > > > >> may not have this cache. I guess that is why we have different
> > > > > > >> understanding here. I have some comments below.
> > > > > > >>
> > > > > > >>
> > > > > > >> 3) The proposed solution using findOffsets(offset,
> leaderEpoch)
> > > > > followed
> > > > > > >> by
> > > > > > >> seek(offset) works if consumer has the cached leaderEpoch ->
> > > offset
> > > > > > >> mapping. But if we assume consumer has this cache, do we need
> to
> > > > have
> > > > > > >> leaderEpoch in the findOffsets(...)? Intuitively, the
> > > > > > findOffsets(offset)
> > > > > > >> can also derive the leaderEpoch using offset just like the
> > > proposed
> > > > > > >> solution does with seek(offset).
> > > > > > >>
> > > > > > >>
> > > > > > >> 4) If consumer does not have cached leaderEpoch -> offset
> > mapping,
> > > > > which
> > > > > > >> is
> > > > > > >> the case if consumer is restarted on a new machine, then it is
> > not
> > > > > clear
> > > > > > >> what leaderEpoch would be included in the FetchRequest if
> > consumer
> > > > > does
> > > > > > >> seek(offset). This is the case that motivates the first
> question
> > > of
> > > > > the
> > > > > > >> previous email. In general, maybe we should discuss the final
> > > > solution
> > > > > > >> that
> > > > > > >> covers all cases?
> > > > > > >>
> > > > > > >>
> > > > > > >> 5) The second question in my previous email is related to the
> > > > > following
> > > > > > >> paragraph:
> > > > > > >>
> > > > > > >> "... In some cases, offsets returned from position() could be
> > > actual
> > > > > > >> consumed messages by this consumer identified by {offset,
> leader
> > > > > epoch}.
> > > > > > >> In
> > > > > > >> other cases, position() returns offset that was not actually
> > > > consumed.
> > > > > > >> Suppose, the user calls position() for the last offset...".
> > > > > > >>
> > > > > > >> I guess my point is that, if user calls position() for the
> last
> > > > offset
> > > > > > and
> > > > > > >> uses that offset in seek(...), then user can probably just
> call
> > > > > > >> Consumer#seekToEnd() without calling position() and seek(...).
> > > > > Similarly
> > > > > > >> user can call Consumer#seekToBeginning() to the seek to the
> > > earliest
> > > > > > >> position without calling position() and seek(...). Thus
> > position()
> > > > > only
> > > > > > >> needs to return the actual consumed messages identified by
> > > {offset,
> > > > > > leader
> > > > > > >> epoch}. Does this make sense?
> > > > > > >>
> > > > > > >>
> > > > > > >> Thanks,
> > > > > > >> Dong
> > > > > > >>
> > > > > > >>
> > > > > > >> On Mon, Jul 9, 2018 at 6:47 PM, Anna Povzner <
> a...@confluent.io
> > >
> > > > > wrote:
> > > > > > >>
> > > > > > >> > Hi Dong,
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > Thanks for considering my suggestions.
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > Based on your comments, I realized that my suggestion was
> not
> > > > > complete
> > > > > > >> with
> > > > > > >> > regard to KafkaConsumer API vs. consumer-broker protocol.
> > While
> > > I
> > > > > > >> propose
> > > > > > >> > to keep KafkaConsumer#seek() unchanged and take offset only,
> > the
> > > > > > >> underlying
> > > > > > >> > consumer will send the next FetchRequest() to broker with
> > offset
> > > > and
> > > > > > >> > leaderEpoch if it is known (based on leader epoch cache in
> > > > > consumer) —
> > > > > > >> note
> > > > > > >> > that this is different from the current KIP, which suggests
> to
> > > > > always
> > > > > > >> send
> > > > > > >> > unknown leader epoch after seek(). This way, if the consumer
> > > and a
> > > > > > >> broker
> > > > > > >> > agreed on the point of non-divergence, which is some
> {offset,
> > > > > > >> leaderEpoch}
> > > > > > >> > pair, the new leader which causes another truncation (even
> > > further
> > > > > > back)
> > > > > > >> > will be able to detect new divergence and restart the
> process
> > of
> > > > > > finding
> > > > > > >> > the new point of non-divergence. So, to answer your
> question,
> > If
> > > > the
> > > > > > >> > truncation happens just after the user calls
> > > > > > >> > KafkaConsumer#findOffsets(offset, leaderEpoch) followed by
> > > > > > seek(offset),
> > > > > > >> > the user will not seek to the wrong position without knowing
> > > that
> > > > > > >> > truncation has happened, because the consumer will get
> another
> > > > > > >> truncation
> > > > > > >> > error, and seek again.
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > I am afraid, I did not understand your second question. Let
> me
> > > > > > >> summarize my
> > > > > > >> > suggestions again, and then give an example to hopefully
> make
> > my
> > > > > > >> > suggestions more clear. Also, the last part of my example
> > shows
> > > > how
> > > > > > the
> > > > > > >> > use-case in your first question will work. If it does not
> > answer
> > > > > your
> > > > > > >> > second question, would you mind clarifying? I am also
> focusing
> > > on
> > > > > the
> > > > > > >> case
> > > > > > >> > of a consumer having enough entries in the cache. The case
> of
> > > > > > restarting
> > > > > > >> > from committed offset either stored externally or internally
> > > will
> > > > > > >> probably
> > > > > > >> > need to be discussed more.
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > Let me summarize my suggestion again:
> > > > > > >> >
> > > > > > >> > 1) KafkaConsumer#seek() and KafkaConsumer#position() remains
> > > > > unchanged
> > > > > > >> >
> > > > > > >> > 2) New KafkaConsumer#findOffsets() takes {offset,
> leaderEpoch}
> > > > pair
> > > > > > per
> > > > > > >> > topic partition and returns offset per topic partition.
> > > > > > >> >
> > > > > > >> > 3) FetchRequest() to broker after KafkaConsumer#seek() will
> > > > contain
> > > > > > the
> > > > > > >> > offset set by seek and leaderEpoch that corresponds to the
> > > offset
> > > > > > based
> > > > > > >> on
> > > > > > >> > leader epoch cache in the consumer.
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > The rest of this e-mail is a long and contrived example with
> > > > several
> > > > > > log
> > > > > > >> > truncations and unclean leader elections to illustrate the
> API
> > > and
> > > > > > your
> > > > > > >> > first use-case. Suppose we have three brokers. Initially,
> > Broker
> > > > A,
> > > > > B,
> > > > > > >> and
> > > > > > >> > C has one message at offset 0 with leader epoch 0. Then,
> > Broker
> > > A
> > > > > goes
> > > > > > >> down
> > > > > > >> > for some time. Broker B becomes a leader with epoch 1, and
> > > writes
> > > > > > >> messages
> > > > > > >> > to offsets 1 and 2. Broker C fetches offset 1, but before
> > > fetching
> > > > > > >> offset
> > > > > > >> > 2, becomes a leader with leader epoch 2 and writes a message
> > at
> > > > > offset
> > > > > > >> 2.
> > > > > > >> > Here is the state of brokers at this point:
> > > > > > >> >
> > > > > > >> > > Broker A:
> > > > > > >> > > offset 0, epoch 0 <— leader
> > > > > > >> > > goes down…
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > > Broker B:
> > > > > > >> > > offset 0, epoch 0
> > > > > > >> > > offset 1, epoch 1  <- leader
> > > > > > >> > > offset 2, epoch 1
> > > > > > >> >
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > Broker C:
> > > > > > >> > > offset 0, epoch 0
> > > > > > >> > > offset 1, epoch 1
> > > > > > >> > > offset 2, epoch 2 <— leader
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > Before Broker C becomes a leader with leader epoch 2, the
> > > consumer
> > > > > > >> consumed
> > > > > > >> > the following messages from broker A and broker B:
> > > > > > >> >
> > > > > > >> > {offset=0, leaderEpoch=0}, {offset=1, leaderEpoch=1},
> > {offset=2,
> > > > > > >> > leaderEpoch=1}.
> > > > > > >> >
> > > > > > >> > Consumer’s leader epoch cache at this point contains the
> > > following
> > > > > > >> entries:
> > > > > > >> >
> > > > > > >> > (leaderEpoch=0, startOffset=0)
> > > > > > >> >
> > > > > > >> > (leaderEpoch=1, startOffset=1)
> > > > > > >> >
> > > > > > >> > endOffset = 3
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > Then, broker B becomes the follower of broker C, truncates
> and
> > > > > starts
> > > > > > >> > fetching from offset 2.
> > > > > > >> >
> > > > > > >> > Consumer sends fetchRequest(offset=3, leaderEpoch=1) and
> gets
> > > > > > >> > LOG_TRUNCATION
> > > > > > >> > error from broker C.
> > > > > > >> >
> > > > > > >> > In response, the client calls KafkaConsumer#findOffsets(
> > > offset=3,
> > > > > > >> > leaderEpoch=1). The underlying consumer sends
> > > > > > >> > OffsetsForLeaderEpoch(leaderEpoch=1), broker C responds with
> > > > > > >> > {leaderEpoch=1, endOffset=2}. So,
> > > > > KafkaConsumer#findOffsets(offset=3,
> > > > > > >> > leaderEpoch=1) returns offset=2.
> > > > > > >> >
> > > > > > >> > In response, consumer calls KafkaConsumer@seek(offset=2)
> > > followed
> > > > > by
> > > > > > >> > poll(), which results in FetchRequest(offset=2,
> leaderEpoch=1)
> > > to
> > > > > > >> broker C.
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > I will continue with this example with the goal to answer
> your
> > > > first
> > > > > > >> > question about truncation just after findOffsets() followed
> by
> > > > > seek():
> > > > > > >> >
> > > > > > >> > Suppose, brokers B and C go down, and broker A comes up and
> > > > becomes
> > > > > a
> > > > > > >> > leader with leader epoch 3, and writes a message to offset
> 1.
> > > > > Suppose,
> > > > > > >> this
> > > > > > >> > happens before the consumer gets response from broker C to
> the
> > > > > > previous
> > > > > > >> > fetch request:  FetchRequest(offset=2, leaderEpoch=1).
> > > > > > >> >
> > > > > > >> > Consumer re-sends FetchRequest(offset=2, leaderEpoch=1) to
> > > broker
> > > > A,
> > > > > > >> which
> > > > > > >> > returns LOG_TRUNCATION error, because broker A has leader
> > epoch
> > > 3
> > > > >
> > > > > > >> leader
> > > > > > >> > epoch in FetchRequest with starting offset = 1 < offset 2 in
> > > > > > >> > FetchRequest().
> > > > > > >> >
> > > > > > >> > In response, the user calls KafkaConsumer#findOffsets(
> > offset=2,
> > > > > > >> > leaderEpoch=1). The underlying consumer sends
> > > > > > >> > OffsetsForLeaderEpoch(leaderEpoch=1), broker A responds with
> > > > > > >> > {leaderEpoch=0, endOffset=1}; the underlying consumer finds
> > > > > > leaderEpoch
> > > > > > >> = 0
> > > > > > >> > in its cache with end offset == 1, which results in
> > > > > > >> > KafkaConsumer#findOffsets(offset=2, leaderEpoch=1) returning
> > > > offset
> > > > > > = 1.
> > > > > > >> >
> > > > > > >> > In response, the user calls KafkaConsumer@seek(offset=1)
> > > followed
> > > > > by
> > > > > > >> > poll(), which results in FetchRequest(offset=1,
> leaderEpoch=0)
> > > to
> > > > > > >> broker A,
> > > > > > >> > which responds with message at offset 1, leader epoch 3.
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > I will think some more about consumers restarting from
> > committed
> > > > > > >> offsets,
> > > > > > >> > and send a follow up.
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > Thanks,
> > > > > > >> >
> > > > > > >> > Anna
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > On Sat, Jul 7, 2018 at 1:36 AM Dong Lin <
> lindon...@gmail.com>
> > > > > wrote:
> > > > > > >> >
> > > > > > >> > > Hey Anna,
> > > > > > >> > >
> > > > > > >> > > Thanks much for the thoughtful reply. It makes sense to
> > > > different
> > > > > > >> between
> > > > > > >> > > "seeking to a message" and "seeking to a position". I have
> > to
> > > > > > >> questions
> > > > > > >> > > here:
> > > > > > >> > >
> > > > > > >> > > - For "seeking to a message" use-case, with the proposed
> > > > approach
> > > > > > user
> > > > > > >> > > needs to call findOffset(offset, leaderEpoch) followed by
> > > > > > >> seek(offset).
> > > > > > >> > If
> > > > > > >> > > message truncation and message append happen immediately
> > after
> > > > > > >> > > findOffset(offset,
> > > > > > >> > > leaderEpoch) but before seek(offset), it seems that user
> > will
> > > > seek
> > > > > > to
> > > > > > >> the
> > > > > > >> > > wrong message without knowing the truncation has happened.
> > > Would
> > > > > > this
> > > > > > >> be
> > > > > > >> > a
> > > > > > >> > > problem?
> > > > > > >> > >
> > > > > > >> > > - For "seeking to a position" use-case, it seems that
> there
> > > can
> > > > be
> > > > > > two
> > > > > > >> > > positions, i.e. earliest and latest. So these two cases
> can
> > be
> > > > > > >> > > Consumer.fulfilled by seekToBeginning() and
> > > > Consumer.seekToEnd().
> > > > > > >> Then it
> > > > > > >> > > seems that user will only need to call position() and
> seek()
> > > for
> > > > > > >> "seeking
> > > > > > >> > > to a message" use-case?
> > > > > > >> > >
> > > > > > >> > > Thanks,
> > > > > > >> > > Dong
> > > > > > >> > >
> > > > > > >> > >
> > > > > > >> > > On Wed, Jul 4, 2018 at 12:33 PM, Anna Povzner <
> > > > a...@confluent.io>
> > > > > > >> wrote:
> > > > > > >> > >
> > > > > > >> > > > Hi Jason and Dong,
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > > > I’ve been thinking about your suggestions and discussion
> > > > > regarding
> > > > > > >> > > > position(), seek(), and new proposed API.
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > > > Here is my thought process why we should keep position()
> > and
> > > > > > seek()
> > > > > > >> API
> > > > > > >> > > > unchanged.
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > > > I think we should separate {offset, leader epoch} that
> > > > uniquely
> > > > > > >> > > identifies
> > > > > > >> > > > a message from an offset that is a position. In some
> > cases,
> > > > > > offsets
> > > > > > >> > > > returned from position() could be actual consumed
> messages
> > > by
> > > > > this
> > > > > > >> > > consumer
> > > > > > >> > > > identified by {offset, leader epoch}. In other cases,
> > > > position()
> > > > > > >> > returns
> > > > > > >> > > > offset that was not actually consumed. Suppose, the user
> > > calls
> > > > > > >> > position()
> > > > > > >> > > > for the last offset. Suppose we return {offset, leader
> > > epoch}
> > > > of
> > > > > > the
> > > > > > >> > > > message currently in the log. Then, the message gets
> > > truncated
> > > > > > >> before
> > > > > > >> > > > consumer’s first poll(). It does not make sense for
> poll()
> > > to
> > > > > fail
> > > > > > >> in
> > > > > > >> > > this
> > > > > > >> > > > case, because the log truncation did not actually happen
> > > from
> > > > > the
> > > > > > >> > > consumer
> > > > > > >> > > > perspective. On the other hand, as the KIP proposes, it
> > > makes
> > > > > > sense
> > > > > > >> for
> > > > > > >> > > the
> > > > > > >> > > > committed() method to return {offset, leader epoch}
> > because
> > > > > those
> > > > > > >> > offsets
> > > > > > >> > > > represent actual consumed messages.
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > > > The same argument applies to the seek() method — we are
> > not
> > > > > > seeking
> > > > > > >> to
> > > > > > >> > a
> > > > > > >> > > > message, we are seeking to a position.
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > > > I like the proposal to add KafkaConsumer#findOffsets()
> > API.
> > > I
> > > > am
> > > > > > >> > assuming
> > > > > > >> > > > something like:
> > > > > > >> > > >
> > > > > > >> > > > Map<TopicPartition, Long>
> findOffsets(Map<TopicPartition,
> > > > > > >> > OffsetAndEpoch>
> > > > > > >> > > > offsetsToSearch)
> > > > > > >> > > >
> > > > > > >> > > > Similar to seek() and position(), I think findOffsets()
> > > should
> > > > > > >> return
> > > > > > >> > > > offset without leader epoch, because what we want is the
> > > > offset
> > > > > > >> that we
> > > > > > >> > > > think is closest to the not divergent message from the
> > given
> > > > > > >> consumed
> > > > > > >> > > > message. Until the consumer actually fetches the
> message,
> > we
> > > > > > should
> > > > > > >> not
> > > > > > >> > > let
> > > > > > >> > > > the consumer store the leader epoch for a message it did
> > not
> > > > > > >> consume.
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > > > So, the workflow will be:
> > > > > > >> > > >
> > > > > > >> > > > 1) The user gets LogTruncationException with {offset,
> > leader
> > > > > epoch
> > > > > > >> of
> > > > > > >> > the
> > > > > > >> > > > previous message} (whatever we send with new
> FetchRecords
> > > > > > request).
> > > > > > >> > > >
> > > > > > >> > > > 2) offset = findOffsets(tp -> {offset, leader epoch})
> > > > > > >> > > >
> > > > > > >> > > > 3) seek(offset)
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > > > For the use-case where the users store committed offsets
> > > > > > externally:
> > > > > > >> > > >
> > > > > > >> > > > 1) Such users would have to track the leader epoch
> > together
> > > > with
> > > > > > an
> > > > > > >> > > offset.
> > > > > > >> > > > Otherwise, there is no way to detect later what leader
> > epoch
> > > > was
> > > > > > >> > > associated
> > > > > > >> > > > with the message. I think it’s reasonable to ask that
> from
> > > > users
> > > > > > if
> > > > > > >> > they
> > > > > > >> > > > want to detect log truncation. Otherwise, they will get
> > the
> > > > > > current
> > > > > > >> > > > behavior.
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > > > If the users currently get an offset to be stored using
> > > > > > position(),
> > > > > > >> I
> > > > > > >> > see
> > > > > > >> > > > two possibilities. First, they call save offset returned
> > > from
> > > > > > >> > position()
> > > > > > >> > > > that they call before poll(). In that case, it would not
> > be
> > > > > > correct
> > > > > > >> to
> > > > > > >> > > > store {offset, leader epoch} if we would have changed
> > > > position()
> > > > > > to
> > > > > > >> > > return
> > > > > > >> > > > {offset, leader epoch} since actual fetched message
> could
> > be
> > > > > > >> different
> > > > > > >> > > > (from the example I described earlier). So, it would be
> > more
> > > > > > >> correct to
> > > > > > >> > > > call position() after poll(). However, the user already
> > gets
> > > > > > >> > > > ConsumerRecords at this point, from which the user can
> > > extract
> > > > > > >> {offset,
> > > > > > >> > > > leader epoch} of the last message.
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > > > So, I like the idea of adding a helper method to
> > > > > ConsumerRecords,
> > > > > > as
> > > > > > >> > > Jason
> > > > > > >> > > > proposed, something like:
> > > > > > >> > > >
> > > > > > >> > > > public OffsetAndEpoch lastOffsetWithLeaderEpoch(), where
> > > > > > >> OffsetAndEpoch
> > > > > > >> > > is
> > > > > > >> > > > a data struct holding {offset, leader epoch}.
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > > > In this case, we would advise the user to follow the
> > > workflow:
> > > > > > >> poll(),
> > > > > > >> > > get
> > > > > > >> > > > {offset, leader epoch} from
> ConsumerRecords#lastOffsetWith
> > > > > > >> > LeaderEpoch(),
> > > > > > >> > > > save offset and leader epoch, process records.
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > > > 2) When the user needs to seek to the last committed
> > offset,
> > > > > they
> > > > > > >> call
> > > > > > >> > > new
> > > > > > >> > > > findOffsets(saved offset, leader epoch), and then
> > > > seek(offset).
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > > > What do you think?
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > > > Thanks,
> > > > > > >> > > >
> > > > > > >> > > > Anna
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > > > On Tue, Jul 3, 2018 at 4:06 PM Dong Lin <
> > > lindon...@gmail.com>
> > > > > > >> wrote:
> > > > > > >> > > >
> > > > > > >> > > > > Hey Jason,
> > > > > > >> > > > >
> > > > > > >> > > > > Thanks much for your thoughtful explanation.
> > > > > > >> > > > >
> > > > > > >> > > > > Yes the solution using findOffsets(offset,
> leaderEpoch)
> > > also
> > > > > > >> works.
> > > > > > >> > The
> > > > > > >> > > > > advantage of this solution it adds only one API
> instead
> > of
> > > > two
> > > > > > >> APIs.
> > > > > > >> > > The
> > > > > > >> > > > > concern is that its usage seems a bit more clumsy for
> > > > advanced
> > > > > > >> users.
> > > > > > >> > > > More
> > > > > > >> > > > > specifically, advanced users who store offsets
> > externally
> > > > will
> > > > > > >> always
> > > > > > >> > > > need
> > > > > > >> > > > > to call findOffsets() before calling seek(offset)
> during
> > > > > > consumer
> > > > > > >> > > > > initialization. And those advanced users will need to
> > > > manually
> > > > > > >> keep
> > > > > > >> > > track
> > > > > > >> > > > > of the leaderEpoch of the last ConsumerRecord.
> > > > > > >> > > > >
> > > > > > >> > > > > The other solution may be more user-friendly for
> > advanced
> > > > > users
> > > > > > >> is to
> > > > > > >> > > add
> > > > > > >> > > > > two APIs, `void seek(offset, leaderEpoch)` and
> `(offset,
> > > > > epoch)
> > > > > > =
> > > > > > >> > > > > offsetEpochs(topicPartition)`.
> > > > > > >> > > > >
> > > > > > >> > > > > I kind of prefer the second solution because it is
> > easier
> > > to
> > > > > use
> > > > > > >> for
> > > > > > >> > > > > advanced users. If we need to expose leaderEpoch
> anyway
> > to
> > > > > > safely
> > > > > > >> > > > identify
> > > > > > >> > > > > a message, it may be conceptually simpler to expose it
> > > > > directly
> > > > > > in
> > > > > > >> > > > > seek(...) rather than requiring one more translation
> > using
> > > > > > >> > > > > findOffsets(...). But I am also OK with the first
> > solution
> > > > if
> > > > > > >> other
> > > > > > >> > > > > developers also favor that one :)
> > > > > > >> > > > >
> > > > > > >> > > > > Thanks,
> > > > > > >> > > > > Dong
> > > > > > >> > > > >
> > > > > > >> > > > >
> > > > > > >> > > > > On Mon, Jul 2, 2018 at 11:10 AM, Jason Gustafson <
> > > > > > >> ja...@confluent.io
> > > > > > >> > >
> > > > > > >> > > > > wrote:
> > > > > > >> > > > >
> > > > > > >> > > > > > Hi Dong,
> > > > > > >> > > > > >
> > > > > > >> > > > > > Thanks, I've been thinking about your suggestions a
> > bit.
> > > > It
> > > > > is
> > > > > > >> > > > > challenging
> > > > > > >> > > > > > to make this work given the current APIs. One of the
> > > > > > >> difficulties
> > > > > > >> > is
> > > > > > >> > > > that
> > > > > > >> > > > > > we don't have an API to find the leader epoch for a
> > > given
> > > > > > >> offset at
> > > > > > >> > > the
> > > > > > >> > > > > > moment. So if the user does a seek to offset 5, then
> > > we'll
> > > > > > need
> > > > > > >> a
> > > > > > >> > new
> > > > > > >> > > > API
> > > > > > >> > > > > > to find the corresponding epoch in order to fulfill
> > the
> > > > new
> > > > > > >> > > position()
> > > > > > >> > > > > API.
> > > > > > >> > > > > > Potentially we could modify ListOffsets to enable
> > > finding
> > > > > the
> > > > > > >> > leader
> > > > > > >> > > > > epoch,
> > > > > > >> > > > > > but I am not sure it is worthwhile. Perhaps it is
> > > > reasonable
> > > > > > for
> > > > > > >> > > > advanced
> > > > > > >> > > > > > usage to expect that the epoch information, if
> needed,
> > > > will
> > > > > be
> > > > > > >> > > > extracted
> > > > > > >> > > > > > from the records directly? It might make sense to
> > > expose a
> > > > > > >> helper
> > > > > > >> > in
> > > > > > >> > > > > > `ConsumerRecords` to make this a little easier
> though.
> > > > > > >> > > > > >
> > > > > > >> > > > > > Alternatively, if we think it is important to have
> > this
> > > > > > >> information
> > > > > > >> > > > > exposed
> > > > > > >> > > > > > directly, we could create batch APIs to solve the
> > naming
> > > > > > >> problem.
> > > > > > >> > For
> > > > > > >> > > > > > example:
> > > > > > >> > > > > >
> > > > > > >> > > > > > Map<TopicPartition, OffsetAndEpoch> positions();
> > > > > > >> > > > > > void seek(Map<TopicPartition, OffsetAndEpoch>
> > > positions);
> > > > > > >> > > > > >
> > > > > > >> > > > > > However, I'm actually leaning toward leaving the
> > seek()
> > > > and
> > > > > > >> > > position()
> > > > > > >> > > > > APIs
> > > > > > >> > > > > > unchanged. Instead, we can add a new API to search
> for
> > > > > offset
> > > > > > by
> > > > > > >> > > > > timestamp
> > > > > > >> > > > > > or by offset/leader epoch. Let's say we call it
> > > > > `findOffsets`.
> > > > > > >> If
> > > > > > >> > the
> > > > > > >> > > > > user
> > > > > > >> > > > > > hits a log truncation error, they can use this API
> to
> > > find
> > > > > the
> > > > > > >> > > closest
> > > > > > >> > > > > > offset and then do a seek(). At the same time, we
> > > > deprecate
> > > > > > the
> > > > > > >> > > > > > `offsetsForTimes` APIs. We now have two use cases
> > which
> > > > > > require
> > > > > > >> > > finding
> > > > > > >> > > > > > offsets, so I think we should make this API general
> > and
> > > > > leave
> > > > > > >> the
> > > > > > >> > > door
> > > > > > >> > > > > open
> > > > > > >> > > > > > for future extensions.
> > > > > > >> > > > > >
> > > > > > >> > > > > > By the way, I'm unclear about the desire to move
> part
> > of
> > > > > this
> > > > > > >> > > > > functionality
> > > > > > >> > > > > > to AdminClient. Guozhang suggested this previously,
> > but
> > > I
> > > > > > think
> > > > > > >> it
> > > > > > >> > > only
> > > > > > >> > > > > > makes sense for cross-cutting capabilities such as
> > topic
> > > > > > >> creation.
> > > > > > >> > If
> > > > > > >> > > > we
> > > > > > >> > > > > > have an API which is primarily useful by consumers,
> > > then I
> > > > > > think
> > > > > > >> > > that's
> > > > > > >> > > > > > where it should be exposed. The AdminClient also has
> > its
> > > > own
> > > > > > API
> > > > > > >> > > > > integrity
> > > > > > >> > > > > > and should not become a dumping ground for advanced
> > use
> > > > > cases.
> > > > > > >> I'll
> > > > > > >> > > > > update
> > > > > > >> > > > > > the KIP with the  `findOffsets` API suggested above
> > and
> > > we
> > > > > can
> > > > > > >> see
> > > > > > >> > if
> > > > > > >> > > > it
> > > > > > >> > > > > > does a good enough job of keeping the API simple for
> > > > common
> > > > > > >> cases.
> > > > > > >> > > > > >
> > > > > > >> > > > > > Thanks,
> > > > > > >> > > > > > Jason
> > > > > > >> > > > > >
> > > > > > >> > > > > >
> > > > > > >> > > > > > On Sat, Jun 30, 2018 at 4:39 AM, Dong Lin <
> > > > > > lindon...@gmail.com>
> > > > > > >> > > wrote:
> > > > > > >> > > > > >
> > > > > > >> > > > > > > Hey Jason,
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > Regarding seek(...), it seems that we want an API
> > for
> > > > user
> > > > > > to
> > > > > > >> > > > > initialize
> > > > > > >> > > > > > > consumer with (offset, leaderEpoch) and that API
> > > should
> > > > > > allow
> > > > > > >> > > > throwing
> > > > > > >> > > > > > > PartitionTruncationException. Suppose we agree on
> > > this,
> > > > > then
> > > > > > >> > > > > > > seekToNearest() is not sufficient because it will
> > > always
> > > > > > >> swallow
> > > > > > >> > > > > > > PartitionTruncationException. Here we have two
> > > options.
> > > > > The
> > > > > > >> first
> > > > > > >> > > > > option
> > > > > > >> > > > > > is
> > > > > > >> > > > > > > to add API offsetsForLeaderEpochs() to translate
> > > > > > (leaderEpoch,
> > > > > > >> > > > offset)
> > > > > > >> > > > > to
> > > > > > >> > > > > > > offset. The second option is to have add
> > seek(offset,
> > > > > > >> > leaderEpoch).
> > > > > > >> > > > It
> > > > > > >> > > > > > > seems that second option may be more simpler
> because
> > > it
> > > > > > makes
> > > > > > >> it
> > > > > > >> > > > clear
> > > > > > >> > > > > > that
> > > > > > >> > > > > > > (offset, leaderEpoch) will be used to identify
> > > > consumer's
> > > > > > >> > position
> > > > > > >> > > > in a
> > > > > > >> > > > > > > partition. And user only needs to handle
> > > > > > >> > > PartitionTruncationException
> > > > > > >> > > > > > from
> > > > > > >> > > > > > > the poll(). In comparison the first option seems a
> > bit
> > > > > > harder
> > > > > > >> to
> > > > > > >> > > use
> > > > > > >> > > > > > > because user have to also handle the
> > > > > > >> PartitionTruncationException
> > > > > > >> > > if
> > > > > > >> > > > > > > offsetsForLeaderEpochs() returns different offset
> > from
> > > > > > >> > > user-provided
> > > > > > >> > > > > > > offset. What do you think?
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > If we decide to add API seek(offset, leaderEpoch),
> > > then
> > > > we
> > > > > > can
> > > > > > >> > > decide
> > > > > > >> > > > > > > whether and how to add API to translate (offset,
> > > > > > leaderEpoch)
> > > > > > >> to
> > > > > > >> > > > > offset.
> > > > > > >> > > > > > It
> > > > > > >> > > > > > > seems that this API will be needed by advanced
> user
> > to
> > > > > don't
> > > > > > >> want
> > > > > > >> > > > auto
> > > > > > >> > > > > > > offset reset (so that it can be notified) but
> still
> > > > wants
> > > > > to
> > > > > > >> > reset
> > > > > > >> > > > > offset
> > > > > > >> > > > > > > to closest. For those users if probably makes
> sense
> > to
> > > > > only
> > > > > > >> have
> > > > > > >> > > the
> > > > > > >> > > > > API
> > > > > > >> > > > > > in
> > > > > > >> > > > > > > AdminClient. offsetsForTimes() seems like a common
> > API
> > > > > that
> > > > > > >> will
> > > > > > >> > be
> > > > > > >> > > > > > needed
> > > > > > >> > > > > > > by user's of consumer in general, so it may be
> more
> > > > > > >> reasonable to
> > > > > > >> > > > stay
> > > > > > >> > > > > in
> > > > > > >> > > > > > > the consumer API. I don't have a strong opinion on
> > > > whether
> > > > > > >> > > > > > > offsetsForTimes() should be replaced by API in
> > > > > AdminClient.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > Though (offset, leaderEpoch) is needed to uniquely
> > > > > identify
> > > > > > a
> > > > > > >> > > message
> > > > > > >> > > > > in
> > > > > > >> > > > > > > general, it is only needed for advanced users who
> > has
> > > > > turned
> > > > > > >> on
> > > > > > >> > > > unclean
> > > > > > >> > > > > > > leader election, need to use seek(..), and don't
> > want
> > > > auto
> > > > > > >> offset
> > > > > > >> > > > > reset.
> > > > > > >> > > > > > > Most other users probably just want to enable auto
> > > > offset
> > > > > > >> reset
> > > > > > >> > and
> > > > > > >> > > > > store
> > > > > > >> > > > > > > offset in Kafka. Thus we might want to keep the
> > > existing
> > > > > > >> > > offset-only
> > > > > > >> > > > > APIs
> > > > > > >> > > > > > > (e.g. seek() and position()) for most users while
> > > adding
> > > > > new
> > > > > > >> APIs
> > > > > > >> > > for
> > > > > > >> > > > > > > advanced users. And yes, it seems that we need new
> > > name
> > > > > for
> > > > > > >> > > > position().
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > Though I think we need new APIs to carry the new
> > > > > information
> > > > > > >> > (e.g.
> > > > > > >> > > > > > > leaderEpoch), I am not very sure how that should
> > look
> > > > > like.
> > > > > > >> One
> > > > > > >> > > > > possible
> > > > > > >> > > > > > > option is those APIs in KIP-232. Another option is
> > > > > something
> > > > > > >> like
> > > > > > >> > > > this:
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > `````
> > > > > > >> > > > > > > class OffsetEpochs {
> > > > > > >> > > > > > >   long offset;
> > > > > > >> > > > > > >   int leaderEpoch;
> > > > > > >> > > > > > >   int partitionEpoch;   // This may be needed
> later
> > as
> > > > > > >> discussed
> > > > > > >> > in
> > > > > > >> > > > > > KIP-232
> > > > > > >> > > > > > >   ... // Hopefully these are all we need to
> identify
> > > > > message
> > > > > > >> in
> > > > > > >> > > > Kafka.
> > > > > > >> > > > > > But
> > > > > > >> > > > > > > if we need more then we can add new fields in this
> > > > class.
> > > > > > >> > > > > > > }
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > OffsetEpochs offsetEpochs(TopicPartition);
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > void seek(TopicPartition, OffsetEpochs);
> > > > > > >> > > > > > > ``````
> > > > > > >> > > > > > >
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > Thanks,
> > > > > > >> > > > > > > Dong
> > > > > > >> > > > > > >
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > On Fri, Jun 29, 2018 at 11:13 AM, Jason Gustafson
> <
> > > > > > >> > > > ja...@confluent.io>
> > > > > > >> > > > > > > wrote:
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > > Hey Dong,
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > Thanks for the feedback. The first three points
> > are
> > > > > easy:
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > 1. Yes, we should be consistent.
> > > > > > >> > > > > > > > 2. Yes, I will add this.
> > > > > > >> > > > > > > > 3. Yes, I think we should document the changes
> to
> > > the
> > > > > > >> committed
> > > > > > >> > > > > offset
> > > > > > >> > > > > > > > schema. I meant to do this, but it slipped my
> > mind.
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > The latter questions are tougher. One option I
> was
> > > > > > >> considering
> > > > > > >> > is
> > > > > > >> > > > to
> > > > > > >> > > > > > have
> > > > > > >> > > > > > > > only `offsetsForLeaderEpochs` exposed from the
> > > > consumer
> > > > > > and
> > > > > > >> to
> > > > > > >> > > drop
> > > > > > >> > > > > the
> > > > > > >> > > > > > > new
> > > > > > >> > > > > > > > seek() API. That seems more consistent with the
> > > > current
> > > > > > use
> > > > > > >> of
> > > > > > >> > > > > > > > `offsetsForTimes` (we don't have a separate
> > > > > > >> `seekToTimestamp`
> > > > > > >> > > API).
> > > > > > >> > > > > An
> > > > > > >> > > > > > > > alternative might be to take a page from the
> > > > AdminClient
> > > > > > API
> > > > > > >> > and
> > > > > > >> > > > add
> > > > > > >> > > > > a
> > > > > > >> > > > > > > new
> > > > > > >> > > > > > > > method to generalize offset lookup. For example,
> > we
> > > > > could
> > > > > > >> have
> > > > > > >> > > > > > > > `lookupOffsets(LookupOptions)`. We could then
> > > > deprecate
> > > > > > >> > > > > > > `offsetsForTimes`
> > > > > > >> > > > > > > > and this would open the door for future
> extensions
> > > > > without
> > > > > > >> > > needing
> > > > > > >> > > > > new
> > > > > > >> > > > > > > > APIs.
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > The case of position() is a little more
> annoying.
> > It
> > > > > would
> > > > > > >> have
> > > > > > >> > > > been
> > > > > > >> > > > > > > better
> > > > > > >> > > > > > > > had we let this return an object so that it is
> > > easier
> > > > to
> > > > > > >> > extend.
> > > > > > >> > > > This
> > > > > > >> > > > > > is
> > > > > > >> > > > > > > > the only reason I didn't add the API to the KIP.
> > > Maybe
> > > > > we
> > > > > > >> > should
> > > > > > >> > > > bite
> > > > > > >> > > > > > the
> > > > > > >> > > > > > > > bullet and fix this now? Unfortunately we'll
> have
> > to
> > > > > come
> > > > > > up
> > > > > > >> > > with a
> > > > > > >> > > > > new
> > > > > > >> > > > > > > > name. Maybe `currentPosition`?
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > Thoughts?
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > -Jason
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > On Fri, Jun 29, 2018 at 10:40 AM, Dong Lin <
> > > > > > >> > lindon...@gmail.com>
> > > > > > >> > > > > > wrote:
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > > Regarding points 4) and 5) above, motivation
> for
> > > the
> > > > > > >> > > alternative
> > > > > > >> > > > > APIs
> > > > > > >> > > > > > > is
> > > > > > >> > > > > > > > > that, if we decide that leaderEpoch is equally
> > > > > important
> > > > > > >> as
> > > > > > >> > > > offset
> > > > > > >> > > > > in
> > > > > > >> > > > > > > > > identifying a message, then it may be
> reasonable
> > > to
> > > > > > always
> > > > > > >> > > > specify
> > > > > > >> > > > > it
> > > > > > >> > > > > > > > > wherever offset is currently required in the
> > > > consumer
> > > > > > API
> > > > > > >> to
> > > > > > >> > > > > > identify a
> > > > > > >> > > > > > > > > message, e.g. position(), seek(). For example,
> > > since
> > > > > we
> > > > > > >> allow
> > > > > > >> > > > user
> > > > > > >> > > > > to
> > > > > > >> > > > > > > > > retrieve offset using position() instead of
> > asking
> > > > > user
> > > > > > to
> > > > > > >> > keep
> > > > > > >> > > > > track
> > > > > > >> > > > > > > of
> > > > > > >> > > > > > > > > the offset of the latest ConsumerRecord, may
> be
> > it
> > > > > will
> > > > > > be
> > > > > > >> > more
> > > > > > >> > > > > > > > consistent
> > > > > > >> > > > > > > > > for user to also retrieve  leaderEpoch using
> > > > > position()?
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > > > > On Fri, Jun 29, 2018 at 10:30 AM, Dong Lin <
> > > > > > >> > > lindon...@gmail.com>
> > > > > > >> > > > > > > wrote:
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > > > > > Hey Jason,
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > > Thanks for the update. It looks pretty good.
> > > Just
> > > > > some
> > > > > > >> > minor
> > > > > > >> > > > > > comments
> > > > > > >> > > > > > > > > > below:
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > > 1) The KIP adds new error code
> > "LOG_TRUNCATION"
> > > > and
> > > > > > new
> > > > > > >> > > > exception
> > > > > > >> > > > > > > > > TruncatedPartitionException.
> > > > > > >> > > > > > > > > > Can we make the name more consistent, e.g.
> > > > > > >> > > > > LogTruncationException?
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > > 2) Do we need to add
> > UnknownLeaderEpochException
> > > > as
> > > > > > >> part of
> > > > > > >> > > API
> > > > > > >> > > > > > > change?
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > > 3) Not sure if the offset topic schema is
> also
> > > > > public
> > > > > > >> API.
> > > > > > >> > If
> > > > > > >> > > > so,
> > > > > > >> > > > > > > maybe
> > > > > > >> > > > > > > > > we
> > > > > > >> > > > > > > > > > should also include the schema change in the
> > > API?
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > > 4) For users who store offset externally,
> > > > currently
> > > > > > they
> > > > > > >> > get
> > > > > > >> > > > > offset
> > > > > > >> > > > > > > > using
> > > > > > >> > > > > > > > > > position(..), store the offset externally,
> and
> > > use
> > > > > > >> seek(..)
> > > > > > >> > > to
> > > > > > >> > > > > > > > initialize
> > > > > > >> > > > > > > > > > the consumer next time. After this KIP they
> > will
> > > > > need
> > > > > > to
> > > > > > >> > > store
> > > > > > >> > > > > and
> > > > > > >> > > > > > > use
> > > > > > >> > > > > > > > > the
> > > > > > >> > > > > > > > > > leaderEpoch together with the offset. Should
> > we
> > > > also
> > > > > > >> update
> > > > > > >> > > the
> > > > > > >> > > > > API
> > > > > > >> > > > > > > so
> > > > > > >> > > > > > > > > that
> > > > > > >> > > > > > > > > > user can also get leaderEpoch from
> > > position(...)?
> > > > > Not
> > > > > > >> sure
> > > > > > >> > if
> > > > > > >> > > > it
> > > > > > >> > > > > is
> > > > > > >> > > > > > > OK
> > > > > > >> > > > > > > > to
> > > > > > >> > > > > > > > > > ask user to track the latest leaderEpoch of
> > > > > > >> ConsumerRecord
> > > > > > >> > by
> > > > > > >> > > > > > > > themselves.
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > > 5) Also for users who store offset
> externally,
> > > > they
> > > > > > >> need to
> > > > > > >> > > > call
> > > > > > >> > > > > > > > seek(..)
> > > > > > >> > > > > > > > > > with leaderEpoch to initialize consumer.
> With
> > > > > current
> > > > > > >> KIP
> > > > > > >> > > users
> > > > > > >> > > > > > need
> > > > > > >> > > > > > > to
> > > > > > >> > > > > > > > > > call seekToNearest(), whose name suggests
> that
> > > the
> > > > > > final
> > > > > > >> > > > position
> > > > > > >> > > > > > may
> > > > > > >> > > > > > > > be
> > > > > > >> > > > > > > > > > different from what was requested. However,
> if
> > > > users
> > > > > > may
> > > > > > >> > want
> > > > > > >> > > > to
> > > > > > >> > > > > > > avoid
> > > > > > >> > > > > > > > > auto
> > > > > > >> > > > > > > > > > offset reset and be notified explicitly when
> > > there
> > > > > is
> > > > > > >> log
> > > > > > >> > > > > > truncation,
> > > > > > >> > > > > > > > > then seekToNearest()
> > > > > > >> > > > > > > > > > probably does not help here. Would it make
> > sense
> > > > to
> > > > > > >> replace
> > > > > > >> > > > > > > > > seekToNearest()
> > > > > > >> > > > > > > > > > with seek(offset, leaderEpoch) + AminClient.
> > > > > > >> > > > > > > > offsetsForLeaderEpochs(...)?
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > > Thanks,
> > > > > > >> > > > > > > > > > Dong
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > > On Wed, Jun 27, 2018 at 3:57 PM, Jason
> > > Gustafson <
> > > > > > >> > > > > > ja...@confluent.io
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > > > wrote:
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > >> Hey Guozhang,
> > > > > > >> > > > > > > > > >>
> > > > > > >> > > > > > > > > >> That's fair. In fact, perhaps we do not
> need
> > > this
> > > > > API
> > > > > > >> at
> > > > > > >> > > all.
> > > > > > >> > > > We
> > > > > > >> > > > > > > > already
> > > > > > >> > > > > > > > > >> have the new seek() in this KIP which can
> do
> > > the
> > > > > > lookup
> > > > > > >> > > based
> > > > > > >> > > > on
> > > > > > >> > > > > > > epoch
> > > > > > >> > > > > > > > > for
> > > > > > >> > > > > > > > > >> this use case. I guess we should probably
> > call
> > > it
> > > > > > >> > > > > seekToNearest()
> > > > > > >> > > > > > > > though
> > > > > > >> > > > > > > > > >> to
> > > > > > >> > > > > > > > > >> make it clear that the final position may
> be
> > > > > > different
> > > > > > >> > from
> > > > > > >> > > > what
> > > > > > >> > > > > > was
> > > > > > >> > > > > > > > > >> requested.
> > > > > > >> > > > > > > > > >>
> > > > > > >> > > > > > > > > >> Thanks,
> > > > > > >> > > > > > > > > >> Jason
> > > > > > >> > > > > > > > > >>
> > > > > > >> > > > > > > > > >> On Wed, Jun 27, 2018 at 3:20 PM, Guozhang
> > Wang
> > > <
> > > > > > >> > > > > > wangg...@gmail.com>
> > > > > > >> > > > > > > > > >> wrote:
> > > > > > >> > > > > > > > > >>
> > > > > > >> > > > > > > > > >> > Hi Jason,
> > > > > > >> > > > > > > > > >> >
> > > > > > >> > > > > > > > > >> > I think it is less worthwhile to add
> > > > > > >> > > > > > > KafkaConsumer#offsetsForLeader
> > > > > > >> > > > > > > > > >> Epochs,
> > > > > > >> > > > > > > > > >> > since probably only very advanced users
> are
> > > > aware
> > > > > > of
> > > > > > >> the
> > > > > > >> > > > > > > > leaderEpoch,
> > > > > > >> > > > > > > > > >> and
> > > > > > >> > > > > > > > > >> > hence ever care to use it anyways. It is
> > more
> > > > > like
> > > > > > an
> > > > > > >> > > admin
> > > > > > >> > > > > > client
> > > > > > >> > > > > > > > > >> > operation than a consumer client
> operation:
> > > if
> > > > > the
> > > > > > >> > > > motivation
> > > > > > >> > > > > is
> > > > > > >> > > > > > > to
> > > > > > >> > > > > > > > > >> > facility customized reset policy, maybe
> > > adding
> > > > it
> > > > > > as
> > > > > > >> > > > > > > > > >> > AdminClient#offsetsForLeaderEpochs
> > > > > > >> > > > > > > > > >> > is better as it is not an aggressive
> > > assumption
> > > > > > that
> > > > > > >> for
> > > > > > >> > > > such
> > > > > > >> > > > > > > > advanced
> > > > > > >> > > > > > > > > >> > users they are willing to use some admin
> > > client
> > > > > to
> > > > > > >> get
> > > > > > >> > > > further
> > > > > > >> > > > > > > > > >> information?
> > > > > > >> > > > > > > > > >> >
> > > > > > >> > > > > > > > > >> >
> > > > > > >> > > > > > > > > >> > Guozhang
> > > > > > >> > > > > > > > > >> >
> > > > > > >> > > > > > > > > >> >
> > > > > > >> > > > > > > > > >> > On Wed, Jun 27, 2018 at 2:07 PM, Jason
> > > > Gustafson
> > > > > <
> > > > > > >> > > > > > > > ja...@confluent.io>
> > > > > > >> > > > > > > > > >> > wrote:
> > > > > > >> > > > > > > > > >> >
> > > > > > >> > > > > > > > > >> > > Thanks for the feedback. I've updated
> the
> > > > KIP.
> > > > > > >> > > > Specifically
> > > > > > >> > > > > I
> > > > > > >> > > > > > > > > removed
> > > > > > >> > > > > > > > > >> the
> > > > > > >> > > > > > > > > >> > > "closest" reset option and the proposal
> > to
> > > > > reset
> > > > > > by
> > > > > > >> > > > > timestamp
> > > > > > >> > > > > > > when
> > > > > > >> > > > > > > > > the
> > > > > > >> > > > > > > > > >> > > precise truncation point cannot be
> > > > determined.
> > > > > > >> > Instead,
> > > > > > >> > > I
> > > > > > >> > > > > > > proposed
> > > > > > >> > > > > > > > > >> that
> > > > > > >> > > > > > > > > >> > we
> > > > > > >> > > > > > > > > >> > > always reset using the nearest epoch
> > when a
> > > > > reset
> > > > > > >> > policy
> > > > > > >> > > > is
> > > > > > >> > > > > > > > defined
> > > > > > >> > > > > > > > > >> > (either
> > > > > > >> > > > > > > > > >> > > "earliest" or "latest"). Does that
> sound
> > > > > > >> reasonable?
> > > > > > >> > > > > > > > > >> > >
> > > > > > >> > > > > > > > > >> > > One thing I am still debating is
> whether
> > it
> > > > > would
> > > > > > >> be
> > > > > > >> > > > better
> > > > > > >> > > > > to
> > > > > > >> > > > > > > > have
> > > > > > >> > > > > > > > > a
> > > > > > >> > > > > > > > > >> > > separate API to find the closest offset
> > > using
> > > > > the
> > > > > > >> > leader
> > > > > > >> > > > > > epoch.
> > > > > > >> > > > > > > In
> > > > > > >> > > > > > > > > the
> > > > > > >> > > > > > > > > >> > > current KIP, I suggested to piggyback
> > this
> > > > > > >> information
> > > > > > >> > > on
> > > > > > >> > > > an
> > > > > > >> > > > > > > > > >> exception,
> > > > > > >> > > > > > > > > >> > but
> > > > > > >> > > > > > > > > >> > > I'm beginning to think it would be
> better
> > > not
> > > > > to
> > > > > > >> hide
> > > > > > >> > > the
> > > > > > >> > > > > > > lookup.
> > > > > > >> > > > > > > > It
> > > > > > >> > > > > > > > > >> is
> > > > > > >> > > > > > > > > >> > > awkward to implement since it means
> > > delaying
> > > > > the
> > > > > > >> > > exception
> > > > > > >> > > > > and
> > > > > > >> > > > > > > the
> > > > > > >> > > > > > > > > API
> > > > > > >> > > > > > > > > >> > may
> > > > > > >> > > > > > > > > >> > > actually be useful when customizing
> reset
> > > > logic
> > > > > > if
> > > > > > >> no
> > > > > > >> > > auto
> > > > > > >> > > > > > reset
> > > > > > >> > > > > > > > > >> policy
> > > > > > >> > > > > >

Reply via email to