Hey Dong,

Thanks for the detailed review. Responses below:

1/2: Thanks for noticing the inconsistency. Would it be reasonable to
simply call it LeaderEpoch for both APIs?

3: I agree it should be a map. I will update.

4: Fair point. I think we should always be able to identify an offset.
Let's remove the Optional for now and reconsider if we find an unhandled
case during implementation.

5: Yeah, I was thinking about this. The two error codes could be handled
similarly, so we might merge them. Mainly I was thinking that it will be
useful for consumers/replicas to know whether they are ahead or behind the
leader. For example, if a consumer sees UNKNOWN_LEADER_EPOCH, it need not
refresh metadata. Or if a replica sees a FENCED_LEADER_EPOCH error, it
could just stop fetching and await the LeaderAndIsr request that it is
missing. It probably also makes debugging a little bit easier. I guess I'm
a bit inclined to keep both error codes, but I'm open to reconsideration if
you feel strongly. Another point to consider is whether we should continue
using NOT_LEADER_FOR_PARTITION if a follower receives an unexpected fetch.
The leader epoch would be different in this case so we could use one of the
invalid epoch error codes instead since they contain more information.

6: I agree the name is not ideal in that scenario. What if we overloaded
`seek`?

7: Sure, I will mention this.


Thanks,
Jason

On Fri, Jul 27, 2018 at 6:17 PM, Dong Lin <lindon...@gmail.com> wrote:

> Hey Jason,
>
> Thanks for the update! I agree with the current proposal overall. I have
> some minor comments related to naming etc.
>
> 1) I am not strong and will just leave it here for discussion. Would it be
> better to rename "CurrentLeaderEpoch" to "ExpectedLeaderEpoch" for the new
> field in the OffsetsForLeaderEpochRequest? The reason is that
> "CurrentLeaderEpoch" may not necessarily be true current leader epoch if
> the consumer has stale metadata. "ExpectedLeaderEpoch" shows that this
> epoch is what consumer expects on the broker which may or may not be the
> true value.
>
> 2) Currently we add the field "LeaderEpoch" to FetchRequest and the field
> "CurrentLeaderEpoch" to OffsetsForLeaderEpochRequest. Given that both
> fields are compared with the leaderEpoch in the broker, would it be better
> to give them the same name?
>
> 3) Currently LogTruncationException.truncationOffset() returns
> Optional<OffsetAndMetadata> to user. Should it return
> Optional<Map<TopicPartition, OffsetAndMetadata>> to handle the scenario
> where leaderEpoch of multiple partitions are different from the leaderEpoch
> in the broker?
>
> 4) Currently LogTruncationException.truncationOffset() returns an Optional
> value. Could you explain a bit more when it will return Optional.empty()? I
> am trying to understand whether it is simpler and reasonable to
> replace Optional.empty()
> with OffsetMetadata(offset=last_fetched_offset, leaderEpoch=-1).
>
> 5) Do we also need to add a new retriable exception for error code
> FENCED_LEADER_EPOCH? And do we need to define both FENCED_LEADER_EPOCH
> and UNKNOWN_LEADER_EPOCH.
> It seems that the current KIP uses these two error codes in the same way
> and the exception for these two error codes is not exposed to the user.
> Maybe we should combine them into one error, e.g. INVALID_LEADER_EPOCH?
>
> 6) For users who has turned off auto offset reset, when consumer.poll()
> throw LogTruncationException, it seems that user will most likely call
> seekToCommitted(offset,
> leaderEpoch) where offset and leaderEpoch are obtained from
> LogTruncationException.truncationOffset(). In this case, the offset used
> here is not committed, which is inconsistent from the method name
> seekToCommitted(...). Would it be better to rename the method to e.g.
> seekToLastConsumedMessage()?
>
> 7) Per point 3 in Jun's comment, would it be useful to explicitly specify
> in the KIP that we will log the truncation event if user has turned on auto
> offset reset policy?
>
>
> Thanks,
> Dong
>
>
> On Fri, Jul 27, 2018 at 12:39 PM, Jason Gustafson <ja...@confluent.io>
> wrote:
>
> > Thanks Anna, you are right on both points. I updated the KIP.
> >
> > -Jason
> >
> > On Thu, Jul 26, 2018 at 2:08 PM, Anna Povzner <a...@confluent.io> wrote:
> >
> > > Hi Jason,
> > >
> > > Thanks for the update. I agree with the current proposal.
> > >
> > > Two minor comments:
> > > 1) In “API Changes” section, first paragraph says that “users can catch
> > the
> > > more specific exception type and use the new `seekToNearest()` API
> > defined
> > > below.”. Since LogTruncationException “will include the partitions that
> > > were truncated and the offset of divergence”., shouldn’t the client use
> > > seek(offset) to seek to the offset of divergence in response to the
> > > exception?
> > > 2) In “Protocol Changes” section, OffsetsForLeaderEpoch subsection says
> > > “Note
> > > that consumers will send a sentinel value (-1) for the current epoch
> and
> > > the broker will simply disregard that validation.”. Is that still true
> > with
> > > MetadataResponse containing leader epoch?
> > >
> > > Thanks,
> > > Anna
> > >
> > > On Wed, Jul 25, 2018 at 1:44 PM Jason Gustafson <ja...@confluent.io>
> > > wrote:
> > >
> > > > Hi All,
> > > >
> > > > I have made some updates to the KIP. As many of you know, a side
> > project
> > > of
> > > > mine has been specifying the Kafka replication protocol in TLA. You
> can
> > > > check out the code here if you are interested:
> > > > https://github.com/hachikuji/kafka-specification. In addition to
> > > > uncovering
> > > > a couple unknown bugs in the replication protocol (e.g.
> > > > https://issues.apache.org/jira/browse/KAFKA-7128), this has helped
> me
> > > > validate the behavior in this KIP. In fact, the original version I
> > > proposed
> > > > had a weakness. I initially suggested letting the leader validate the
> > > > expected epoch at the fetch offset. This made sense for the consumer
> in
> > > the
> > > > handling of unclean leader election, but it was not strong enough to
> > > > protect the follower in all cases. In order to make advancement of
> the
> > > high
> > > > watermark safe, for example, the leader actually needs to be sure
> that
> > > > every follower in the ISR matches its own epoch.
> > > >
> > > > I attempted to fix this problem by treating the epoch in the fetch
> > > request
> > > > slightly differently for consumers and followers. For consumers, it
> > would
> > > > be the expected epoch of the record at the fetch offset, and the
> leader
> > > > would raise a LOG_TRUNCATION error if the expectation failed. For
> > > > followers, it would be the current epoch and the leader would require
> > > that
> > > > it match its own epoch. This was unsatisfying both because of the
> > > > inconsistency in behavior and because the consumer was left with the
> > > weaker
> > > > fencing that we already knew was insufficient for the replicas.
> > > Ultimately
> > > > I decided that we should make the behavior consistent and that meant
> > that
> > > > the consumer needed to act more like a following replica. Instead of
> > > > checking for truncation while fetching, the consumer should check for
> > > > truncation after leader changes. After checking for truncation, the
> > > > consumer can then use the current epoch when fetching and get the
> > > stronger
> > > > protection that it provides. What this means is that the Metadata API
> > > must
> > > > include the current leader epoch. Given the problems we have had
> around
> > > > stale metadata and how challenging they have been to debug, I'm
> > convinced
> > > > that this is a good idea in any case and it resolves the inconsistent
> > > > behavior in the Fetch API. The downside is that there will be some
> > > > additional overhead upon leader changes, but I don't think it is a
> > major
> > > > concern since leader changes are rare and the OffsetForLeaderEpoch
> > > request
> > > > is cheap.
> > > >
> > > > This approach leaves the door open for some interesting follow up
> > > > improvements. For example, now that we have the leader epoch in the
> > > > Metadata request, we can implement similar fencing for the Produce
> API.
> > > And
> > > > now that the consumer can reason about truncation, we could consider
> > > having
> > > > a configuration to expose records beyond the high watermark. This
> would
> > > let
> > > > users trade lower end-to-end latency for weaker durability semantics.
> > It
> > > is
> > > > sort of like having an acks=0 option for the consumer. Neither of
> these
> > > > options are included in this KIP, I am just mentioning them as
> > potential
> > > > work for the future.
> > > >
> > > > Finally, based on the discussion in this thread, I have added the
> > > > seekToCommitted API for the consumer. Please take a look and let me
> > know
> > > > what you think.
> > > >
> > > > Thanks,
> > > > Jason
> > > >
> > > > On Fri, Jul 20, 2018 at 2:34 PM, Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi Jason,
> > > > >
> > > > > The proposed API seems reasonable to me too. Could you please also
> > > update
> > > > > the wiki page (
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > 320%3A+Allow+fetchers+to+detect+and+handle+log+truncation)
> > > > > with a section say "workflow" on how the proposed API will be
> co-used
> > > > with
> > > > > others to:
> > > > >
> > > > > 1. consumer callers handling a LogTruncationException.
> > > > > 2. consumer internals for handling a retriable
> > > > UnknownLeaderEpochException.
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Tue, Jul 17, 2018 at 10:23 AM, Anna Povzner <a...@confluent.io>
> > > > wrote:
> > > > >
> > > > > > Hi Jason,
> > > > > >
> > > > > >
> > > > > > I also like your proposal and agree that
> > > > KafkaConsumer#seekToCommitted()
> > > > > > is
> > > > > > more intuitive as a way to initialize both consumer's position
> and
> > > its
> > > > > > fetch state.
> > > > > >
> > > > > >
> > > > > > My understanding that KafkaConsumer#seekToCommitted() is purely
> > for
> > > > > > clients
> > > > > > who store their offsets externally, right? And we are still going
> > to
> > > > > > add KafkaConsumer#findOffsets()
> > > > > > in this KIP as we discussed, so that the client can handle
> > > > > > LogTruncationException?
> > > > > >
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Anna
> > > > > >
> > > > > >
> > > > > > On Thu, Jul 12, 2018 at 3:57 PM Dong Lin <lindon...@gmail.com>
> > > wrote:
> > > > > >
> > > > > > > Hey Jason,
> > > > > > >
> > > > > > > It is a great summary. The solution sounds good. I might have
> > minor
> > > > > > > comments regarding the method name. But we can discuss that
> minor
> > > > > points
> > > > > > > later after we reach consensus on the high level API.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Dong
> > > > > > >
> > > > > > >
> > > > > > > On Thu, Jul 12, 2018 at 11:41 AM, Jason Gustafson <
> > > > ja...@confluent.io>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hey Anna and Dong,
> > > > > > > >
> > > > > > > > Thanks a lot for the great discussion. I've been hanging
> back a
> > > bit
> > > > > > > because
> > > > > > > > honestly the best option hasn't seemed clear. I agree with
> > Anna's
> > > > > > general
> > > > > > > > observation that there is a distinction between the position
> of
> > > the
> > > > > > > > consumer and its fetch state up to that position. If you
> think
> > > > about
> > > > > > it,
> > > > > > > a
> > > > > > > > committed offset actually represents both of these. The
> > metadata
> > > is
> > > > > > used
> > > > > > > to
> > > > > > > > initialize the state of the consumer application and the
> offset
> > > > > > > initializes
> > > > > > > > the position. Additionally, we are extending the offset
> commit
> > in
> > > > > this
> > > > > > > KIP
> > > > > > > > to also include the last epoch fetched by the consumer, which
> > is
> > > > used
> > > > > > to
> > > > > > > > initialize the internal fetch state. Of course if you do an
> > > > arbitrary
> > > > > > > > `seek` and immediately commit offsets, then there won't be a
> > last
> > > > > epoch
> > > > > > > to
> > > > > > > > commit. This seems intuitive since there is no fetch state in
> > > this
> > > > > > case.
> > > > > > > We
> > > > > > > > only commit fetch state when we have it.
> > > > > > > >
> > > > > > > > So if we think about a committed offset as initializing both
> > the
> > > > > > > consumer's
> > > > > > > > position and its fetch state, then the gap in the API is
> > > evidently
> > > > > that
> > > > > > > we
> > > > > > > > don't have a way to initialize the consumer to a committed
> > > offset.
> > > > We
> > > > > > do
> > > > > > > it
> > > > > > > > implicitly of course for offsets stored in Kafka, but since
> > > > external
> > > > > > > > storage is a use case we support, then we should have an
> > explicit
> > > > API
> > > > > > as
> > > > > > > > well. Perhaps something like this:
> > > > > > > >
> > > > > > > > seekToCommitted(TopicPartition, OffsetAndMetadata)
> > > > > > > >
> > > > > > > > In this KIP, we are proposing to allow the
> `OffsetAndMetadata`
> > > > object
> > > > > > to
> > > > > > > > include the leader epoch, so I think this would have the same
> > > > effect
> > > > > as
> > > > > > > > Anna's suggested `seekToRecord`. But perhaps it is a more
> > natural
> > > > fit
> > > > > > > given
> > > > > > > > the current API? Furthermore, if we find a need for
> additional
> > > > > metadata
> > > > > > > in
> > > > > > > > the offset commit API in the future, then we will just need
> to
> > > > modify
> > > > > > the
> > > > > > > > `OffsetAndMetadata` object and we will not need a new `seek`
> > API.
> > > > > > > >
> > > > > > > > With this approach, I think then we can leave the `position`
> > API
> > > as
> > > > > it
> > > > > > > is.
> > > > > > > > The position of the consumer is still just the next expected
> > > fetch
> > > > > > > offset.
> > > > > > > > If a user needs to record additional state based on previous
> > > fetch
> > > > > > > > progress, then they would use the result of the previous
> fetch
> > to
> > > > > > obtain
> > > > > > > > it. This makes the dependence on fetch progress explicit. I
> > think
> > > > we
> > > > > > > could
> > > > > > > > make this a little more convenience with a helper in the
> > > > > > > `ConsumerRecords`
> > > > > > > > object, but I think that's more of a nice-to-have.
> > > > > > > >
> > > > > > > > Thoughts?
> > > > > > > >
> > > > > > > > By the way, I have been iterating a little bit on the replica
> > > side
> > > > of
> > > > > > > this
> > > > > > > > KIP. My initial proposal in fact did not have strong enough
> > > fencing
> > > > > to
> > > > > > > > protect all of the edge cases. I believe the current proposal
> > > fixes
> > > > > the
> > > > > > > > problems, but I am still verifying the model.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Jason
> > > > > > > >
> > > > > > > >
> > > > > > > > On Wed, Jul 11, 2018 at 10:45 AM, Dong Lin <
> > lindon...@gmail.com>
> > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hey Anna,
> > > > > > > > >
> > > > > > > > > Thanks much for the explanation. Approach 1 also sounds
> good
> > to
> > > > > me. I
> > > > > > > > think
> > > > > > > > > findOffsets() is useful for users who don't use automatic
> > > offset
> > > > > > reset
> > > > > > > > > policy.
> > > > > > > > >
> > > > > > > > > Just one more question. Since users who store offsets
> > > externally
> > > > > need
> > > > > > > to
> > > > > > > > > provide leaderEpoch to findOffsets(...), do we need an
> extra
> > > API
> > > > > for
> > > > > > > user
> > > > > > > > > to get both offset and leaderEpoch, e.g. recordPosition()?
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Dong
> > > > > > > > >
> > > > > > > > > On Wed, Jul 11, 2018 at 10:12 AM, Anna Povzner <
> > > > a...@confluent.io>
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Dong,
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > What I called “not covering all use cases” is what you
> call
> > > > > > > best-effort
> > > > > > > > > > (not guaranteeing some corner cases). I think we are on
> the
> > > > same
> > > > > > page
> > > > > > > > > here.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > I wanted to be clear in the API whether the consumer
> seeks
> > > to a
> > > > > > > > position
> > > > > > > > > > (offset) or to a record (offset, leader epoch). The only
> > > > use-case
> > > > > > of
> > > > > > > > > > seeking to a record is seeking to a committed offset for
> a
> > > user
> > > > > who
> > > > > > > > > stores
> > > > > > > > > > committed offsets externally. (Unless users find some
> other
> > > > > reason
> > > > > > to
> > > > > > > > > seek
> > > > > > > > > > to a record.) I thought it was possible to provide this
> > > > > > functionality
> > > > > > > > > with
> > > > > > > > > > findOffset(offset, leader epoch) followed by a
> > seek(offset).
> > > > > > However,
> > > > > > > > you
> > > > > > > > > > are right that this will not handle the race condition
> > where
> > > > > > > > > non-divergent
> > > > > > > > > > offset found by findOffset() could change again before
> the
> > > > > consumer
> > > > > > > > does
> > > > > > > > > > the first fetch.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Regarding position() — if we add position that returns
> > > (offset,
> > > > > > > leader
> > > > > > > > > > epoch), this is specifically a position after a record
> that
> > > was
> > > > > > > > actually
> > > > > > > > > > consumed or position of a committed record. In which
> case,
> > I
> > > > > still
> > > > > > > > think
> > > > > > > > > > it’s cleaner to get a record position of consumed message
> > > from
> > > > a
> > > > > > new
> > > > > > > > > helper
> > > > > > > > > > method in ConsumerRecords() or from committed offsets.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > I think all the use-cases could be then covered with:
> > > > > > > > > >
> > > > > > > > > > (Approach 1)
> > > > > > > > > >
> > > > > > > > > > seekToRecord(offset, leaderEpoch) — this will just
> > > > initialize/set
> > > > > > the
> > > > > > > > > > consumer state;
> > > > > > > > > >
> > > > > > > > > > findOffsets(offset, leaderEpoch) returns {offset,
> > > leaderEpoch}
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > If we agree that the race condition is also a corner
> case,
> > > > then I
> > > > > > > think
> > > > > > > > > we
> > > > > > > > > > can cover use-cases with:
> > > > > > > > > >
> > > > > > > > > > (Approach 2)
> > > > > > > > > >
> > > > > > > > > > findOffsets(offset, leaderEpoch) returns offset — we
> still
> > > want
> > > > > > > leader
> > > > > > > > > > epoch as a parameter for the users who store their
> > committed
> > > > > > offsets
> > > > > > > > > > externally.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > I am actually now leaning more to approach 1, since it is
> > > more
> > > > > > > > explicit,
> > > > > > > > > > and maybe there are more use cases for it.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > >
> > > > > > > > > > Anna
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Tue, Jul 10, 2018 at 3:47 PM Dong Lin <
> > > lindon...@gmail.com>
> > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hey Anna,
> > > > > > > > > > >
> > > > > > > > > > > Thanks for the comment. To answer your question, it
> seems
> > > > that
> > > > > we
> > > > > > > can
> > > > > > > > > > cover
> > > > > > > > > > > all case in this KIP. As stated in "Consumer Handling"
> > > > section,
> > > > > > > > KIP-101
> > > > > > > > > > > based approach will be used to derive the truncation
> > offset
> > > > > from
> > > > > > > the
> > > > > > > > > > > 2-tuple (offset, leaderEpoch). This approach is best
> > effort
> > > > and
> > > > > > it
> > > > > > > is
> > > > > > > > > > > inaccurate only in very rare scenarios (as described in
> > > > > KIP-279).
> > > > > > > > > > >
> > > > > > > > > > > By using seek(offset, leaderEpoch), consumer will still
> > be
> > > > able
> > > > > > to
> > > > > > > > > follow
> > > > > > > > > > > this best-effort approach to detect log truncation and
> > > > > determine
> > > > > > > the
> > > > > > > > > > > truncation offset. On the other hand, if we use
> > > seek(offset),
> > > > > > > > consumer
> > > > > > > > > > will
> > > > > > > > > > > not detect log truncation in some cases which weakens
> the
> > > > > > guarantee
> > > > > > > > of
> > > > > > > > > > this
> > > > > > > > > > > KIP. Does this make sense?
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > > Dong
> > > > > > > > > > >
> > > > > > > > > > > On Tue, Jul 10, 2018 at 3:14 PM, Anna Povzner <
> > > > > a...@confluent.io
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Sorry, I hit "send" before finishing. Continuing...
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > 2) Hiding most of the consumer handling log
> truncation
> > > > logic
> > > > > > with
> > > > > > > > > > minimal
> > > > > > > > > > > > exposure in KafkaConsumer API.  I was proposing this
> > > path.
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > Before answering your specific questions… I want to
> > > answer
> > > > to
> > > > > > > your
> > > > > > > > > > > comment
> > > > > > > > > > > > “In general, maybe we should discuss the final
> solution
> > > > that
> > > > > > > covers
> > > > > > > > > all
> > > > > > > > > > > > cases?”. With current KIP, we don’t cover all cases
> of
> > > > > consumer
> > > > > > > > > > detecting
> > > > > > > > > > > > log truncation because the KIP proposes a leader
> epoch
> > > > cache
> > > > > in
> > > > > > > > > > consumer
> > > > > > > > > > > > that does not persist across restarts. Plus, we only
> > > store
> > > > > last
> > > > > > > > > > committed
> > > > > > > > > > > > offset (either internally or users can store
> > externally).
> > > > > This
> > > > > > > has
> > > > > > > > a
> > > > > > > > > > > > limitation that the consumer will not always be able
> to
> > > > find
> > > > > > > point
> > > > > > > > of
> > > > > > > > > > > > truncation just because we have a limited history
> (just
> > > one
> > > > > > data
> > > > > > > > > > point).
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > So, maybe we should first agree on whether we accept
> > that
> > > > > > storing
> > > > > > > > > last
> > > > > > > > > > > > committed offset/leader epoch has a limitation that
> the
> > > > > > consumer
> > > > > > > > will
> > > > > > > > > > not
> > > > > > > > > > > > be able to detect log truncation in all cases?
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks,
> > > > > > > > > > > >
> > > > > > > > > > > > Anna
> > > > > > > > > > > >
> > > > > > > > > > > > On Tue, Jul 10, 2018 at 2:20 PM Anna Povzner <
> > > > > > a...@confluent.io>
> > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi Dong,
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks for the follow up! I finally have much more
> > > clear
> > > > > > > > > > understanding
> > > > > > > > > > > of
> > > > > > > > > > > > > where you are coming from.
> > > > > > > > > > > > >
> > > > > > > > > > > > > You are right. The success of
> findOffsets()/finding a
> > > > point
> > > > > > of
> > > > > > > > > > > > > non-divergence depends on whether we have enough
> > > entries
> > > > in
> > > > > > the
> > > > > > > > > > > > consumer's
> > > > > > > > > > > > > leader epoch cache. However, I think this is a
> > > > fundamental
> > > > > > > > > limitation
> > > > > > > > > > > of
> > > > > > > > > > > > > having a leader epoch cache that does not persist
> > > across
> > > > > > > consumer
> > > > > > > > > > > > restarts.
> > > > > > > > > > > > >
> > > > > > > > > > > > > If we consider the general case where consumer may
> or
> > > may
> > > > > not
> > > > > > > > have
> > > > > > > > > > this
> > > > > > > > > > > > > cache, then I see two paths:
> > > > > > > > > > > > > 1) Letting the user to track the leader epoch
> history
> > > > > > > externally,
> > > > > > > > > and
> > > > > > > > > > > > have
> > > > > > > > > > > > > more exposure to leader epoch and finding point of
> > > > > > > non-divergence
> > > > > > > > > in
> > > > > > > > > > > > > KafkaConsumer API. I understand this is the case
> you
> > > were
> > > > > > > talking
> > > > > > > > > > > about.
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Tue, Jul 10, 2018 at 12:16 PM Dong Lin <
> > > > > > lindon...@gmail.com
> > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > >> Hey Anna,
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> Thanks much for your detailed explanation and
> > example!
> > > > It
> > > > > > does
> > > > > > > > > help
> > > > > > > > > > me
> > > > > > > > > > > > >> understand the difference between our
> understanding.
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> So it seems that the solution based on
> findOffsets()
> > > > > > currently
> > > > > > > > > > focuses
> > > > > > > > > > > > >> mainly on the scenario that consumer has cached
> > > > > leaderEpoch
> > > > > > ->
> > > > > > > > > > offset
> > > > > > > > > > > > >> mapping whereas I was thinking about the general
> > case
> > > > > where
> > > > > > > > > consumer
> > > > > > > > > > > may
> > > > > > > > > > > > >> or
> > > > > > > > > > > > >> may not have this cache. I guess that is why we
> have
> > > > > > different
> > > > > > > > > > > > >> understanding here. I have some comments below.
> > > > > > > > > > > > >>
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> 3) The proposed solution using findOffsets(offset,
> > > > > > > leaderEpoch)
> > > > > > > > > > > followed
> > > > > > > > > > > > >> by
> > > > > > > > > > > > >> seek(offset) works if consumer has the cached
> > > > leaderEpoch
> > > > > ->
> > > > > > > > > offset
> > > > > > > > > > > > >> mapping. But if we assume consumer has this cache,
> > do
> > > we
> > > > > > need
> > > > > > > to
> > > > > > > > > > have
> > > > > > > > > > > > >> leaderEpoch in the findOffsets(...)? Intuitively,
> > the
> > > > > > > > > > > > findOffsets(offset)
> > > > > > > > > > > > >> can also derive the leaderEpoch using offset just
> > like
> > > > the
> > > > > > > > > proposed
> > > > > > > > > > > > >> solution does with seek(offset).
> > > > > > > > > > > > >>
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> 4) If consumer does not have cached leaderEpoch ->
> > > > offset
> > > > > > > > mapping,
> > > > > > > > > > > which
> > > > > > > > > > > > >> is
> > > > > > > > > > > > >> the case if consumer is restarted on a new
> machine,
> > > then
> > > > > it
> > > > > > is
> > > > > > > > not
> > > > > > > > > > > clear
> > > > > > > > > > > > >> what leaderEpoch would be included in the
> > FetchRequest
> > > > if
> > > > > > > > consumer
> > > > > > > > > > > does
> > > > > > > > > > > > >> seek(offset). This is the case that motivates the
> > > first
> > > > > > > question
> > > > > > > > > of
> > > > > > > > > > > the
> > > > > > > > > > > > >> previous email. In general, maybe we should
> discuss
> > > the
> > > > > > final
> > > > > > > > > > solution
> > > > > > > > > > > > >> that
> > > > > > > > > > > > >> covers all cases?
> > > > > > > > > > > > >>
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> 5) The second question in my previous email is
> > related
> > > > to
> > > > > > the
> > > > > > > > > > > following
> > > > > > > > > > > > >> paragraph:
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> "... In some cases, offsets returned from
> position()
> > > > could
> > > > > > be
> > > > > > > > > actual
> > > > > > > > > > > > >> consumed messages by this consumer identified by
> > > > {offset,
> > > > > > > leader
> > > > > > > > > > > epoch}.
> > > > > > > > > > > > >> In
> > > > > > > > > > > > >> other cases, position() returns offset that was
> not
> > > > > actually
> > > > > > > > > > consumed.
> > > > > > > > > > > > >> Suppose, the user calls position() for the last
> > > > > offset...".
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> I guess my point is that, if user calls position()
> > for
> > > > the
> > > > > > > last
> > > > > > > > > > offset
> > > > > > > > > > > > and
> > > > > > > > > > > > >> uses that offset in seek(...), then user can
> > probably
> > > > just
> > > > > > > call
> > > > > > > > > > > > >> Consumer#seekToEnd() without calling position()
> and
> > > > > > seek(...).
> > > > > > > > > > > Similarly
> > > > > > > > > > > > >> user can call Consumer#seekToBeginning() to the
> seek
> > > to
> > > > > the
> > > > > > > > > earliest
> > > > > > > > > > > > >> position without calling position() and seek(...).
> > > Thus
> > > > > > > > position()
> > > > > > > > > > > only
> > > > > > > > > > > > >> needs to return the actual consumed messages
> > > identified
> > > > by
> > > > > > > > > {offset,
> > > > > > > > > > > > leader
> > > > > > > > > > > > >> epoch}. Does this make sense?
> > > > > > > > > > > > >>
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> Thanks,
> > > > > > > > > > > > >> Dong
> > > > > > > > > > > > >>
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> On Mon, Jul 9, 2018 at 6:47 PM, Anna Povzner <
> > > > > > > a...@confluent.io
> > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> > Hi Dong,
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > Thanks for considering my suggestions.
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > Based on your comments, I realized that my
> > > suggestion
> > > > > was
> > > > > > > not
> > > > > > > > > > > complete
> > > > > > > > > > > > >> with
> > > > > > > > > > > > >> > regard to KafkaConsumer API vs. consumer-broker
> > > > > protocol.
> > > > > > > > While
> > > > > > > > > I
> > > > > > > > > > > > >> propose
> > > > > > > > > > > > >> > to keep KafkaConsumer#seek() unchanged and take
> > > offset
> > > > > > only,
> > > > > > > > the
> > > > > > > > > > > > >> underlying
> > > > > > > > > > > > >> > consumer will send the next FetchRequest() to
> > broker
> > > > > with
> > > > > > > > offset
> > > > > > > > > > and
> > > > > > > > > > > > >> > leaderEpoch if it is known (based on leader
> epoch
> > > > cache
> > > > > in
> > > > > > > > > > > consumer) —
> > > > > > > > > > > > >> note
> > > > > > > > > > > > >> > that this is different from the current KIP,
> which
> > > > > > suggests
> > > > > > > to
> > > > > > > > > > > always
> > > > > > > > > > > > >> send
> > > > > > > > > > > > >> > unknown leader epoch after seek(). This way, if
> > the
> > > > > > consumer
> > > > > > > > > and a
> > > > > > > > > > > > >> broker
> > > > > > > > > > > > >> > agreed on the point of non-divergence, which is
> > some
> > > > > > > {offset,
> > > > > > > > > > > > >> leaderEpoch}
> > > > > > > > > > > > >> > pair, the new leader which causes another
> > truncation
> > > > > (even
> > > > > > > > > further
> > > > > > > > > > > > back)
> > > > > > > > > > > > >> > will be able to detect new divergence and
> restart
> > > the
> > > > > > > process
> > > > > > > > of
> > > > > > > > > > > > finding
> > > > > > > > > > > > >> > the new point of non-divergence. So, to answer
> > your
> > > > > > > question,
> > > > > > > > If
> > > > > > > > > > the
> > > > > > > > > > > > >> > truncation happens just after the user calls
> > > > > > > > > > > > >> > KafkaConsumer#findOffsets(offset, leaderEpoch)
> > > > followed
> > > > > > by
> > > > > > > > > > > > seek(offset),
> > > > > > > > > > > > >> > the user will not seek to the wrong position
> > without
> > > > > > knowing
> > > > > > > > > that
> > > > > > > > > > > > >> > truncation has happened, because the consumer
> will
> > > get
> > > > > > > another
> > > > > > > > > > > > >> truncation
> > > > > > > > > > > > >> > error, and seek again.
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > I am afraid, I did not understand your second
> > > > question.
> > > > > > Let
> > > > > > > me
> > > > > > > > > > > > >> summarize my
> > > > > > > > > > > > >> > suggestions again, and then give an example to
> > > > hopefully
> > > > > > > make
> > > > > > > > my
> > > > > > > > > > > > >> > suggestions more clear. Also, the last part of
> my
> > > > > example
> > > > > > > > shows
> > > > > > > > > > how
> > > > > > > > > > > > the
> > > > > > > > > > > > >> > use-case in your first question will work. If it
> > > does
> > > > > not
> > > > > > > > answer
> > > > > > > > > > > your
> > > > > > > > > > > > >> > second question, would you mind clarifying? I am
> > > also
> > > > > > > focusing
> > > > > > > > > on
> > > > > > > > > > > the
> > > > > > > > > > > > >> case
> > > > > > > > > > > > >> > of a consumer having enough entries in the
> cache.
> > > The
> > > > > case
> > > > > > > of
> > > > > > > > > > > > restarting
> > > > > > > > > > > > >> > from committed offset either stored externally
> or
> > > > > > internally
> > > > > > > > > will
> > > > > > > > > > > > >> probably
> > > > > > > > > > > > >> > need to be discussed more.
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > Let me summarize my suggestion again:
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > 1) KafkaConsumer#seek() and
> > KafkaConsumer#position()
> > > > > > remains
> > > > > > > > > > > unchanged
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > 2) New KafkaConsumer#findOffsets() takes
> {offset,
> > > > > > > leaderEpoch}
> > > > > > > > > > pair
> > > > > > > > > > > > per
> > > > > > > > > > > > >> > topic partition and returns offset per topic
> > > > partition.
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > 3) FetchRequest() to broker after
> > > KafkaConsumer#seek()
> > > > > > will
> > > > > > > > > > contain
> > > > > > > > > > > > the
> > > > > > > > > > > > >> > offset set by seek and leaderEpoch that
> > corresponds
> > > to
> > > > > the
> > > > > > > > > offset
> > > > > > > > > > > > based
> > > > > > > > > > > > >> on
> > > > > > > > > > > > >> > leader epoch cache in the consumer.
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > The rest of this e-mail is a long and contrived
> > > > example
> > > > > > with
> > > > > > > > > > several
> > > > > > > > > > > > log
> > > > > > > > > > > > >> > truncations and unclean leader elections to
> > > illustrate
> > > > > the
> > > > > > > API
> > > > > > > > > and
> > > > > > > > > > > > your
> > > > > > > > > > > > >> > first use-case. Suppose we have three brokers.
> > > > > Initially,
> > > > > > > > Broker
> > > > > > > > > > A,
> > > > > > > > > > > B,
> > > > > > > > > > > > >> and
> > > > > > > > > > > > >> > C has one message at offset 0 with leader epoch
> 0.
> > > > Then,
> > > > > > > > Broker
> > > > > > > > > A
> > > > > > > > > > > goes
> > > > > > > > > > > > >> down
> > > > > > > > > > > > >> > for some time. Broker B becomes a leader with
> > epoch
> > > 1,
> > > > > and
> > > > > > > > > writes
> > > > > > > > > > > > >> messages
> > > > > > > > > > > > >> > to offsets 1 and 2. Broker C fetches offset 1,
> but
> > > > > before
> > > > > > > > > fetching
> > > > > > > > > > > > >> offset
> > > > > > > > > > > > >> > 2, becomes a leader with leader epoch 2 and
> > writes a
> > > > > > message
> > > > > > > > at
> > > > > > > > > > > offset
> > > > > > > > > > > > >> 2.
> > > > > > > > > > > > >> > Here is the state of brokers at this point:
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > > Broker A:
> > > > > > > > > > > > >> > > offset 0, epoch 0 <— leader
> > > > > > > > > > > > >> > > goes down…
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > > Broker B:
> > > > > > > > > > > > >> > > offset 0, epoch 0
> > > > > > > > > > > > >> > > offset 1, epoch 1  <- leader
> > > > > > > > > > > > >> > > offset 2, epoch 1
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > Broker C:
> > > > > > > > > > > > >> > > offset 0, epoch 0
> > > > > > > > > > > > >> > > offset 1, epoch 1
> > > > > > > > > > > > >> > > offset 2, epoch 2 <— leader
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > Before Broker C becomes a leader with leader
> epoch
> > > 2,
> > > > > the
> > > > > > > > > consumer
> > > > > > > > > > > > >> consumed
> > > > > > > > > > > > >> > the following messages from broker A and broker
> B:
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > {offset=0, leaderEpoch=0}, {offset=1,
> > > leaderEpoch=1},
> > > > > > > > {offset=2,
> > > > > > > > > > > > >> > leaderEpoch=1}.
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > Consumer’s leader epoch cache at this point
> > contains
> > > > the
> > > > > > > > > following
> > > > > > > > > > > > >> entries:
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > (leaderEpoch=0, startOffset=0)
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > (leaderEpoch=1, startOffset=1)
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > endOffset = 3
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > Then, broker B becomes the follower of broker C,
> > > > > truncates
> > > > > > > and
> > > > > > > > > > > starts
> > > > > > > > > > > > >> > fetching from offset 2.
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > Consumer sends fetchRequest(offset=3,
> > leaderEpoch=1)
> > > > and
> > > > > > > gets
> > > > > > > > > > > > >> > LOG_TRUNCATION
> > > > > > > > > > > > >> > error from broker C.
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > In response, the client calls
> > > > KafkaConsumer#findOffsets(
> > > > > > > > > offset=3,
> > > > > > > > > > > > >> > leaderEpoch=1). The underlying consumer sends
> > > > > > > > > > > > >> > OffsetsForLeaderEpoch(leaderEpoch=1), broker C
> > > > responds
> > > > > > with
> > > > > > > > > > > > >> > {leaderEpoch=1, endOffset=2}. So,
> > > > > > > > > > > KafkaConsumer#findOffsets(offset=3,
> > > > > > > > > > > > >> > leaderEpoch=1) returns offset=2.
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > In response, consumer calls KafkaConsumer@seek
> > > > > (offset=2)
> > > > > > > > > followed
> > > > > > > > > > > by
> > > > > > > > > > > > >> > poll(), which results in FetchRequest(offset=2,
> > > > > > > leaderEpoch=1)
> > > > > > > > > to
> > > > > > > > > > > > >> broker C.
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > I will continue with this example with the goal
> to
> > > > > answer
> > > > > > > your
> > > > > > > > > > first
> > > > > > > > > > > > >> > question about truncation just after
> findOffsets()
> > > > > > followed
> > > > > > > by
> > > > > > > > > > > seek():
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > Suppose, brokers B and C go down, and broker A
> > comes
> > > > up
> > > > > > and
> > > > > > > > > > becomes
> > > > > > > > > > > a
> > > > > > > > > > > > >> > leader with leader epoch 3, and writes a message
> > to
> > > > > offset
> > > > > > > 1.
> > > > > > > > > > > Suppose,
> > > > > > > > > > > > >> this
> > > > > > > > > > > > >> > happens before the consumer gets response from
> > > broker
> > > > C
> > > > > to
> > > > > > > the
> > > > > > > > > > > > previous
> > > > > > > > > > > > >> > fetch request:  FetchRequest(offset=2,
> > > leaderEpoch=1).
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > Consumer re-sends FetchRequest(offset=2,
> > > > leaderEpoch=1)
> > > > > to
> > > > > > > > > broker
> > > > > > > > > > A,
> > > > > > > > > > > > >> which
> > > > > > > > > > > > >> > returns LOG_TRUNCATION error, because broker A
> has
> > > > > leader
> > > > > > > > epoch
> > > > > > > > > 3
> > > > > > > > > > >
> > > > > > > > > > > > >> leader
> > > > > > > > > > > > >> > epoch in FetchRequest with starting offset = 1 <
> > > > offset
> > > > > 2
> > > > > > in
> > > > > > > > > > > > >> > FetchRequest().
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > In response, the user calls
> > > KafkaConsumer#findOffsets(
> > > > > > > > offset=2,
> > > > > > > > > > > > >> > leaderEpoch=1). The underlying consumer sends
> > > > > > > > > > > > >> > OffsetsForLeaderEpoch(leaderEpoch=1), broker A
> > > > responds
> > > > > > with
> > > > > > > > > > > > >> > {leaderEpoch=0, endOffset=1}; the underlying
> > > consumer
> > > > > > finds
> > > > > > > > > > > > leaderEpoch
> > > > > > > > > > > > >> = 0
> > > > > > > > > > > > >> > in its cache with end offset == 1, which results
> > in
> > > > > > > > > > > > >> > KafkaConsumer#findOffsets(offset=2,
> > leaderEpoch=1)
> > > > > > returning
> > > > > > > > > > offset
> > > > > > > > > > > > = 1.
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > In response, the user calls KafkaConsumer@seek
> > > > > (offset=1)
> > > > > > > > > followed
> > > > > > > > > > > by
> > > > > > > > > > > > >> > poll(), which results in FetchRequest(offset=1,
> > > > > > > leaderEpoch=0)
> > > > > > > > > to
> > > > > > > > > > > > >> broker A,
> > > > > > > > > > > > >> > which responds with message at offset 1, leader
> > > epoch
> > > > 3.
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > I will think some more about consumers
> restarting
> > > from
> > > > > > > > committed
> > > > > > > > > > > > >> offsets,
> > > > > > > > > > > > >> > and send a follow up.
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > Thanks,
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > Anna
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > On Sat, Jul 7, 2018 at 1:36 AM Dong Lin <
> > > > > > > lindon...@gmail.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > > Hey Anna,
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> > > Thanks much for the thoughtful reply. It makes
> > > sense
> > > > > to
> > > > > > > > > > different
> > > > > > > > > > > > >> between
> > > > > > > > > > > > >> > > "seeking to a message" and "seeking to a
> > > position".
> > > > I
> > > > > > have
> > > > > > > > to
> > > > > > > > > > > > >> questions
> > > > > > > > > > > > >> > > here:
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> > > - For "seeking to a message" use-case, with
> the
> > > > > proposed
> > > > > > > > > > approach
> > > > > > > > > > > > user
> > > > > > > > > > > > >> > > needs to call findOffset(offset, leaderEpoch)
> > > > followed
> > > > > > by
> > > > > > > > > > > > >> seek(offset).
> > > > > > > > > > > > >> > If
> > > > > > > > > > > > >> > > message truncation and message append happen
> > > > > immediately
> > > > > > > > after
> > > > > > > > > > > > >> > > findOffset(offset,
> > > > > > > > > > > > >> > > leaderEpoch) but before seek(offset), it seems
> > > that
> > > > > user
> > > > > > > > will
> > > > > > > > > > seek
> > > > > > > > > > > > to
> > > > > > > > > > > > >> the
> > > > > > > > > > > > >> > > wrong message without knowing the truncation
> has
> > > > > > happened.
> > > > > > > > > Would
> > > > > > > > > > > > this
> > > > > > > > > > > > >> be
> > > > > > > > > > > > >> > a
> > > > > > > > > > > > >> > > problem?
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> > > - For "seeking to a position" use-case, it
> seems
> > > > that
> > > > > > > there
> > > > > > > > > can
> > > > > > > > > > be
> > > > > > > > > > > > two
> > > > > > > > > > > > >> > > positions, i.e. earliest and latest. So these
> > two
> > > > > cases
> > > > > > > can
> > > > > > > > be
> > > > > > > > > > > > >> > > Consumer.fulfilled by seekToBeginning() and
> > > > > > > > > > Consumer.seekToEnd().
> > > > > > > > > > > > >> Then it
> > > > > > > > > > > > >> > > seems that user will only need to call
> > position()
> > > > and
> > > > > > > seek()
> > > > > > > > > for
> > > > > > > > > > > > >> "seeking
> > > > > > > > > > > > >> > > to a message" use-case?
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> > > Thanks,
> > > > > > > > > > > > >> > > Dong
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> > > On Wed, Jul 4, 2018 at 12:33 PM, Anna Povzner
> <
> > > > > > > > > > a...@confluent.io>
> > > > > > > > > > > > >> wrote:
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> > > > Hi Jason and Dong,
> > > > > > > > > > > > >> > > >
> > > > > > > > > > > > >> > > >
> > > > > > > > > > > > >> > > > I’ve been thinking about your suggestions
> and
> > > > > > discussion
> > > > > > > > > > > regarding
> > > > > > > > > > > > >> > > > position(), seek(), and new proposed API.
> > > > > > > > > > > > >> > > >
> > > > > > > > > > > > >> > > >
> > > > > > > > > > > > >> > > > Here is my thought process why we should
> keep
> > > > > > position()
> > > > > > > > and
> > > > > > > > > > > > seek()
> > > > > > > > > > > > >> API
> > > > > > > > > > > > >> > > > unchanged.
> > > > > > > > > > > > >> > > >
> > > > > > > > > > > > >> > > >
> > > > > > > > > > > > >> > > > I think we should separate {offset, leader
> > > epoch}
> > > > > that
> > > > > > > > > > uniquely
> > > > > > > > > > > > >> > > identifies
> > > > > > > > > > > > >> > > > a message from an offset that is a position.
> > In
> > > > some
> > > > > > > > cases,
> > > > > > > > > > > > offsets
> > > > > > > > > > > > >> > > > returned from position() could be actual
> > > consumed
> > > > > > > messages
> > > > > > > > > by
> > > > > > > > > > > this
> > > > > > > > > > > > >> > > consumer
> > > > > > > > > > > > >> > > > identified by {offset, leader epoch}. In
> other
> > > > > cases,
> > > > > > > > > > position()
> > > > > > > > > > > > >> > returns
> > > > > > > > > > > > >> > > > offset that was not actually consumed.
> > Suppose,
> > > > the
> > > > > > user
> > > > > > > > > calls
> > > > > > > > > > > > >> > position()
> > > > > > > > > > > > >> > > > for the last offset. Suppose we return
> > {offset,
> > > > > leader
> > > > > > > > > epoch}
> > > > > > > > > > of
> > > > > > > > > > > > the
> > > > > > > > > > > > >> > > > message currently in the log. Then, the
> > message
> > > > gets
> > > > > > > > > truncated
> > > > > > > > > > > > >> before
> > > > > > > > > > > > >> > > > consumer’s first poll(). It does not make
> > sense
> > > > for
> > > > > > > poll()
> > > > > > > > > to
> > > > > > > > > > > fail
> > > > > > > > > > > > >> in
> > > > > > > > > > > > >> > > this
> > > > > > > > > > > > >> > > > case, because the log truncation did not
> > > actually
> > > > > > happen
> > > > > > > > > from
> > > > > > > > > > > the
> > > > > > > > > > > > >> > > consumer
> > > > > > > > > > > > >> > > > perspective. On the other hand, as the KIP
> > > > proposes,
> > > > > > it
> > > > > > > > > makes
> > > > > > > > > > > > sense
> > > > > > > > > > > > >> for
> > > > > > > > > > > > >> > > the
> > > > > > > > > > > > >> > > > committed() method to return {offset, leader
> > > > epoch}
> > > > > > > > because
> > > > > > > > > > > those
> > > > > > > > > > > > >> > offsets
> > > > > > > > > > > > >> > > > represent actual consumed messages.
> > > > > > > > > > > > >> > > >
> > > > > > > > > > > > >> > > >
> > > > > > > > > > > > >> > > > The same argument applies to the seek()
> > method —
> > > > we
> > > > > > are
> > > > > > > > not
> > > > > > > > > > > > seeking
> > > > > > > > > > > > >> to
> > > > > > > > > > > > >> > a
> > > > > > > > > > > > >> > > > message, we are seeking to a position.
> > > > > > > > > > > > >> > > >
> > > > > > > > > > > > >> > > >
> > > > > > > > > > > > >> > > > I like the proposal to add
> > > > > KafkaConsumer#findOffsets()
> > > > > > > > API.
> > > > > > > > > I
> > > > > > > > > > am
> > > > > > > > > > > > >> > assuming
> > > > > > > > > > > > >> > > > something like:
> > > > > > > > > > > > >> > > >
> > > > > > > > > > > > >> > > > Map<TopicPartition, Long>
> > > > > > > findOffsets(Map<TopicPartition,
> > > > > > > > > > > > >> > OffsetAndEpoch>
> > > > > > > > > > > > >> > > > offsetsToSearch)
> > > > > > > > > > > > >> > > >
> > > > > > > > > > > > >> > > > Similar to seek() and position(), I think
> > > > > > findOffsets()
> > > > > > > > > should
> > > > > > > > > > > > >> return
> > > > > > > > > > > > >> > > > offset without leader epoch, because what we
> > > want
> > > > is
> > > > > > the
> > > > > > > > > > offset
> > > > > > > > > > > > >> that we
> > > > > > > > > > > > >> > > > think is closest to the not divergent
> message
> > > from
> > > > > the
> > > > > > > > given
> > > > > > > > > > > > >> consumed
> > > > > > > > > > > > >> > > > message. Until the consumer actually fetches
> > the
> > > > > > > message,
> > > > > > > > we
> > > > > > > > > > > > should
> > > > > > > > > > > > >> not
> > > > > > > > > > > > >> > > let
> > > > > > > > > > > > >> > > > the consumer store the leader epoch for a
> > > message
> > > > it
> > > > > > did
> > > > > > > > not
> > > > > > > > > > > > >> consume.
> > > > > > > > > > > > >> > > >
> > > > > > > > > > > > >> > > >
> > > > > > > > > > > > >> > > > So, the workflow will be:
> > > > > > > > > > > > >> > > >
> > > > > > > > > > > > >> > > > 1) The user gets LogTruncationException with
> > > > > {offset,
> > > > > > > > leader
> > > > > > > > > > > epoch
> > > > > > > > > > > > >> of
> > > > > > > > > > > > >> > the
> > > > > > > > > > > > >> > > > previous message} (whatever we send with new
> > > > > > > FetchRecords
> > > > > > > > > > > > request).
> > > > > > > > > > > > >> > > >
> > > > > > > > > > > > >> > > > 2) offset = findOffsets(tp -> {offset,
> leader
> > > > > epoch})
> > > > > > > > > > > > >> > > >
> > > > > > > > > > > > >> > > > 3) seek(offset)
> > > > > > > > > > > > >> > > >
> > > > > > > > > > > > >> > > >
> > > > > > > > > > > > >> > > > For the use-case where the users store
> > committed
> > > > > > offsets
> > > > > > > > > > > > externally:
> > > > > > > > > > > > >> > > >
> > > > > > > > > > > > >> > > > 1) Such users would have to track the leader
> > > epoch
> > > > > > > > together
> > > > > > > > > > with
> > > > > > > > > > > > an
> > > > > > > > > > > > >> > > offset.
> > > > > > > > > > > > >> > > > Otherwise, there is no way to detect later
> > what
> > > > > leader
> > > > > > > > epoch
> > > > > > > > > > was
> > > > > > > > > > > > >> > > associated
> > > > > > > > > > > > >> > > > with the message. I think it’s reasonable to
> > ask
> > > > > that
> > > > > > > from
> > > > > > > > > > users
> > > > > > > > > > > > if
> > > > > > > > > > > > >> > they
> > > > > > > > > > > > >> > > > want to detect log truncation. Otherwise,
> they
> > > > will
> > > > > > get
> > > > > > > > the
> > > > > > > > > > > > current
> > > > > > > > > > > > >> > > > behavior.
> > > > > > > > > > > > >> > > >
> > > > > > > > > > > > >> > > >
> > > > > > > > > > > > >> > > > If the users currently get an offset to be
> > > stored
> > > > > > using
> > > > > > > > > > > > position(),
> > > > > > > > > > > > >> I
> > > > > > > > > > > > >> > see
> > > > > > > > > > > > >> > > > two possibilities. First, they call save
> > offset
> > > > > > returned
> > > > > > > > > from
> > > > > > > > > > > > >> > position()
> > > > > > > > > > > > >> > > > that they call before poll(). In that case,
> it
> > > > would
> > > > > > not
> > > > > > > > be
> > > > > > > > > > > > correct
> > > > > > > > > > > > >> to
> > > > > > > > > > > > >> > > > store {offset, leader epoch} if we would
> have
> > > > > changed
> > > > > > > > > > position()
> > > > > > > > > > > > to
> > > > > > > > > > > > >> > > return
> > > > > > > > > > > > >> > > > {offset, leader epoch} since actual fetched
> > > > message
> > > > > > > could
> > > > > > > > be
> > > > > > > > > > > > >> different
> > > > > > > > > > > > >> > > > (from the example I described earlier). So,
> it
> > > > would
> > > > > > be
> > > > > > > > more
> > > > > > > > > > > > >> correct to
> > > > > > > > > > > > >> > > > call position() after poll(). However, the
> > user
> > > > > > already
> > > > > > > > gets
> > > > > > > > > > > > >> > > > ConsumerRecords at this point, from which
> the
> > > user
> > > > > can
> > > > > > > > > extract
> > > > > > > > > > > > >> {offset,
> > > > > > > > > > > > >> > > > leader epoch} of the last message.
> > > > > > > > > > > > >> > > >
> > > > > > > > > > > > >> > > >
> > > > > > > > > > > > >> > > > So, I like the idea of adding a helper
> method
> > to
> > > > > > > > > > > ConsumerRecords,
> > > > > > > > > > > > as
> > > > > > > > > > > > >> > > Jason
> > > > > > > > > > > > >> > > > proposed, something like:
> > > > > > > > > > > > >> > > >
> > > > > > > > > > > > >> > > > public OffsetAndEpoch
> > > lastOffsetWithLeaderEpoch(),
> > > > > > where
> > > > > > > > > > > > >> OffsetAndEpoch
> > > > > > > > > > > > >> > > is
> > > > > > > > > > > > >> > > > a data struct holding {offset, leader
> epoch}.
> > > > > > > > > > > > >> > > >
> > > > > > > > > > > > >> > > >
> > > > > > > > > > > > >> > > > In this case, we would advise the user to
> > follow
> > > > the
> > > > > > > > > workflow:
> > > > > > > > > > > > >> poll(),
> > > > > > > > > > > > >> > > get
> > > > > > > > > > > > >> > > > {offset, leader epoch} from
> > > > > > > ConsumerRecords#lastOffsetWith
> > > > > > > > > > > > >> > LeaderEpoch(),
> > > > > > > > > > > > >> > > > save offset and leader epoch, process
> records.
> > > > > > > > > > > > >> > > >
> > > > > > > > > > > > >> > > >
> > > > > > > > > > > > >> > > > 2) When the user needs to seek to the last
> > > > committed
> > > > > > > > offset,
> > > > > > > > > > > they
> > > > > > > > > > > > >> call
> > > > > > > > > > > > >> > > new
> > > > > > > > > > > > >> > > > findOffsets(saved offset, leader epoch), and
> > > then
> > > > > > > > > > seek(offset).
> > > > > > > > > > > > >> > > >
> > > > > > > > > > > > >> > > >
> > > > > > > > > > > > >> > > > What do you think?
> > > > > > > > > > > > >> > > >
> > > > > > > > > > > > >> > > >
> > > > > > > > > > > > >> > > > Thanks,
> > > > > > > > > > > > >> > > >
> > > > > > > > > > > > >> > > > Anna
> > > > > > > > > > > > >> > > >
> > > > > > > > > > > > >> > > >
> > > > > > > > > > > > >> > > > On Tue, Jul 3, 2018 at 4:06 PM Dong Lin <
> > > > > > > > > lindon...@gmail.com>
> > > > > > > > > > > > >> wrote:
> > > > > > > > > > > > >> > > >
> > > > > > > > > > > > >> > > > > Hey Jason,
> > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > >> > > > > Thanks much for your thoughtful
> explanation.
> > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > >> > > > > Yes the solution using findOffsets(offset,
> > > > > > > leaderEpoch)
> > > > > > > > > also
> > > > > > > > > > > > >> works.
> > > > > > > > > > > > >> > The
> > > > > > > > > > > > >> > > > > advantage of this solution it adds only
> one
> > > API
> > > > > > > instead
> > > > > > > > of
> > > > > > > > > > two
> > > > > > > > > > > > >> APIs.
> > > > > > > > > > > > >> > > The
> > > > > > > > > > > > >> > > > > concern is that its usage seems a bit more
> > > > clumsy
> > > > > > for
> > > > > > > > > > advanced
> > > > > > > > > > > > >> users.
> > > > > > > > > > > > >> > > > More
> > > > > > > > > > > > >> > > > > specifically, advanced users who store
> > offsets
> > > > > > > > externally
> > > > > > > > > > will
> > > > > > > > > > > > >> always
> > > > > > > > > > > > >> > > > need
> > > > > > > > > > > > >> > > > > to call findOffsets() before calling
> > > > seek(offset)
> > > > > > > during
> > > > > > > > > > > > consumer
> > > > > > > > > > > > >> > > > > initialization. And those advanced users
> > will
> > > > need
> > > > > > to
> > > > > > > > > > manually
> > > > > > > > > > > > >> keep
> > > > > > > > > > > > >> > > track
> > > > > > > > > > > > >> > > > > of the leaderEpoch of the last
> > ConsumerRecord.
> > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > >> > > > > The other solution may be more
> user-friendly
> > > for
> > > > > > > > advanced
> > > > > > > > > > > users
> > > > > > > > > > > > >> is to
> > > > > > > > > > > > >> > > add
> > > > > > > > > > > > >> > > > > two APIs, `void seek(offset, leaderEpoch)`
> > and
> > > > > > > `(offset,
> > > > > > > > > > > epoch)
> > > > > > > > > > > > =
> > > > > > > > > > > > >> > > > > offsetEpochs(topicPartition)`.
> > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > >> > > > > I kind of prefer the second solution
> because
> > > it
> > > > is
> > > > > > > > easier
> > > > > > > > > to
> > > > > > > > > > > use
> > > > > > > > > > > > >> for
> > > > > > > > > > > > >> > > > > advanced users. If we need to expose
> > > leaderEpoch
> > > > > > > anyway
> > > > > > > > to
> > > > > > > > > > > > safely
> > > > > > > > > > > > >> > > > identify
> > > > > > > > > > > > >> > > > > a message, it may be conceptually simpler
> to
> > > > > expose
> > > > > > it
> > > > > > > > > > > directly
> > > > > > > > > > > > in
> > > > > > > > > > > > >> > > > > seek(...) rather than requiring one more
> > > > > translation
> > > > > > > > using
> > > > > > > > > > > > >> > > > > findOffsets(...). But I am also OK with
> the
> > > > first
> > > > > > > > solution
> > > > > > > > > > if
> > > > > > > > > > > > >> other
> > > > > > > > > > > > >> > > > > developers also favor that one :)
> > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > >> > > > > Thanks,
> > > > > > > > > > > > >> > > > > Dong
> > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > >> > > > > On Mon, Jul 2, 2018 at 11:10 AM, Jason
> > > > Gustafson <
> > > > > > > > > > > > >> ja...@confluent.io
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> > > > > wrote:
> > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > >> > > > > > Hi Dong,
> > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > >> > > > > > Thanks, I've been thinking about your
> > > > > suggestions
> > > > > > a
> > > > > > > > bit.
> > > > > > > > > > It
> > > > > > > > > > > is
> > > > > > > > > > > > >> > > > > challenging
> > > > > > > > > > > > >> > > > > > to make this work given the current
> APIs.
> > > One
> > > > of
> > > > > > the
> > > > > > > > > > > > >> difficulties
> > > > > > > > > > > > >> > is
> > > > > > > > > > > > >> > > > that
> > > > > > > > > > > > >> > > > > > we don't have an API to find the leader
> > > epoch
> > > > > for
> > > > > > a
> > > > > > > > > given
> > > > > > > > > > > > >> offset at
> > > > > > > > > > > > >> > > the
> > > > > > > > > > > > >> > > > > > moment. So if the user does a seek to
> > offset
> > > > 5,
> > > > > > then
> > > > > > > > > we'll
> > > > > > > > > > > > need
> > > > > > > > > > > > >> a
> > > > > > > > > > > > >> > new
> > > > > > > > > > > > >> > > > API
> > > > > > > > > > > > >> > > > > > to find the corresponding epoch in order
> > to
> > > > > > fulfill
> > > > > > > > the
> > > > > > > > > > new
> > > > > > > > > > > > >> > > position()
> > > > > > > > > > > > >> > > > > API.
> > > > > > > > > > > > >> > > > > > Potentially we could modify ListOffsets
> to
> > > > > enable
> > > > > > > > > finding
> > > > > > > > > > > the
> > > > > > > > > > > > >> > leader
> > > > > > > > > > > > >> > > > > epoch,
> > > > > > > > > > > > >> > > > > > but I am not sure it is worthwhile.
> > Perhaps
> > > it
> > > > > is
> > > > > > > > > > reasonable
> > > > > > > > > > > > for
> > > > > > > > > > > > >> > > > advanced
> > > > > > > > > > > > >> > > > > > usage to expect that the epoch
> > information,
> > > if
> > > > > > > needed,
> > > > > > > > > > will
> > > > > > > > > > > be
> > > > > > > > > > > > >> > > > extracted
> > > > > > > > > > > > >> > > > > > from the records directly? It might make
> > > sense
> > > > > to
> > > > > > > > > expose a
> > > > > > > > > > > > >> helper
> > > > > > > > > > > > >> > in
> > > > > > > > > > > > >> > > > > > `ConsumerRecords` to make this a little
> > > easier
> > > > > > > though.
> > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > >> > > > > > Alternatively, if we think it is
> important
> > > to
> > > > > have
> > > > > > > > this
> > > > > > > > > > > > >> information
> > > > > > > > > > > > >> > > > > exposed
> > > > > > > > > > > > >> > > > > > directly, we could create batch APIs to
> > > solve
> > > > > the
> > > > > > > > naming
> > > > > > > > > > > > >> problem.
> > > > > > > > > > > > >> > For
> > > > > > > > > > > > >> > > > > > example:
> > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > >> > > > > > Map<TopicPartition, OffsetAndEpoch>
> > > > positions();
> > > > > > > > > > > > >> > > > > > void seek(Map<TopicPartition,
> > > OffsetAndEpoch>
> > > > > > > > > positions);
> > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > >> > > > > > However, I'm actually leaning toward
> > leaving
> > > > the
> > > > > > > > seek()
> > > > > > > > > > and
> > > > > > > > > > > > >> > > position()
> > > > > > > > > > > > >> > > > > APIs
> > > > > > > > > > > > >> > > > > > unchanged. Instead, we can add a new API
> > to
> > > > > search
> > > > > > > for
> > > > > > > > > > > offset
> > > > > > > > > > > > by
> > > > > > > > > > > > >> > > > > timestamp
> > > > > > > > > > > > >> > > > > > or by offset/leader epoch. Let's say we
> > call
> > > > it
> > > > > > > > > > > `findOffsets`.
> > > > > > > > > > > > >> If
> > > > > > > > > > > > >> > the
> > > > > > > > > > > > >> > > > > user
> > > > > > > > > > > > >> > > > > > hits a log truncation error, they can
> use
> > > this
> > > > > API
> > > > > > > to
> > > > > > > > > find
> > > > > > > > > > > the
> > > > > > > > > > > > >> > > closest
> > > > > > > > > > > > >> > > > > > offset and then do a seek(). At the same
> > > time,
> > > > > we
> > > > > > > > > > deprecate
> > > > > > > > > > > > the
> > > > > > > > > > > > >> > > > > > `offsetsForTimes` APIs. We now have two
> > use
> > > > > cases
> > > > > > > > which
> > > > > > > > > > > > require
> > > > > > > > > > > > >> > > finding
> > > > > > > > > > > > >> > > > > > offsets, so I think we should make this
> > API
> > > > > > general
> > > > > > > > and
> > > > > > > > > > > leave
> > > > > > > > > > > > >> the
> > > > > > > > > > > > >> > > door
> > > > > > > > > > > > >> > > > > open
> > > > > > > > > > > > >> > > > > > for future extensions.
> > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > >> > > > > > By the way, I'm unclear about the desire
> > to
> > > > move
> > > > > > > part
> > > > > > > > of
> > > > > > > > > > > this
> > > > > > > > > > > > >> > > > > functionality
> > > > > > > > > > > > >> > > > > > to AdminClient. Guozhang suggested this
> > > > > > previously,
> > > > > > > > but
> > > > > > > > > I
> > > > > > > > > > > > think
> > > > > > > > > > > > >> it
> > > > > > > > > > > > >> > > only
> > > > > > > > > > > > >> > > > > > makes sense for cross-cutting
> capabilities
> > > > such
> > > > > as
> > > > > > > > topic
> > > > > > > > > > > > >> creation.
> > > > > > > > > > > > >> > If
> > > > > > > > > > > > >> > > > we
> > > > > > > > > > > > >> > > > > > have an API which is primarily useful by
> > > > > > consumers,
> > > > > > > > > then I
> > > > > > > > > > > > think
> > > > > > > > > > > > >> > > that's
> > > > > > > > > > > > >> > > > > > where it should be exposed. The
> > AdminClient
> > > > also
> > > > > > has
> > > > > > > > its
> > > > > > > > > > own
> > > > > > > > > > > > API
> > > > > > > > > > > > >> > > > > integrity
> > > > > > > > > > > > >> > > > > > and should not become a dumping ground
> for
> > > > > > advanced
> > > > > > > > use
> > > > > > > > > > > cases.
> > > > > > > > > > > > >> I'll
> > > > > > > > > > > > >> > > > > update
> > > > > > > > > > > > >> > > > > > the KIP with the  `findOffsets` API
> > > suggested
> > > > > > above
> > > > > > > > and
> > > > > > > > > we
> > > > > > > > > > > can
> > > > > > > > > > > > >> see
> > > > > > > > > > > > >> > if
> > > > > > > > > > > > >> > > > it
> > > > > > > > > > > > >> > > > > > does a good enough job of keeping the
> API
> > > > simple
> > > > > > for
> > > > > > > > > > common
> > > > > > > > > > > > >> cases.
> > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > >> > > > > > Thanks,
> > > > > > > > > > > > >> > > > > > Jason
> > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > >> > > > > > On Sat, Jun 30, 2018 at 4:39 AM, Dong
> Lin
> > <
> > > > > > > > > > > > lindon...@gmail.com>
> > > > > > > > > > > > >> > > wrote:
> > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > >> > > > > > > Hey Jason,
> > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > >> > > > > > > Regarding seek(...), it seems that we
> > want
> > > > an
> > > > > > API
> > > > > > > > for
> > > > > > > > > > user
> > > > > > > > > > > > to
> > > > > > > > > > > > >> > > > > initialize
> > > > > > > > > > > > >> > > > > > > consumer with (offset, leaderEpoch)
> and
> > > that
> > > > > API
> > > > > > > > > should
> > > > > > > > > > > > allow
> > > > > > > > > > > > >> > > > throwing
> > > > > > > > > > > > >> > > > > > > PartitionTruncationException. Suppose
> we
> > > > agree
> > > > > > on
> > > > > > > > > this,
> > > > > > > > > > > then
> > > > > > > > > > > > >> > > > > > > seekToNearest() is not sufficient
> > because
> > > it
> > > > > > will
> > > > > > > > > always
> > > > > > > > > > > > >> swallow
> > > > > > > > > > > > >> > > > > > > PartitionTruncationException. Here we
> > have
> > > > two
> > > > > > > > > options.
> > > > > > > > > > > The
> > > > > > > > > > > > >> first
> > > > > > > > > > > > >> > > > > option
> > > > > > > > > > > > >> > > > > > is
> > > > > > > > > > > > >> > > > > > > to add API offsetsForLeaderEpochs() to
> > > > > translate
> > > > > > > > > > > > (leaderEpoch,
> > > > > > > > > > > > >> > > > offset)
> > > > > > > > > > > > >> > > > > to
> > > > > > > > > > > > >> > > > > > > offset. The second option is to have
> add
> > > > > > > > seek(offset,
> > > > > > > > > > > > >> > leaderEpoch).
> > > > > > > > > > > > >> > > > It
> > > > > > > > > > > > >> > > > > > > seems that second option may be more
> > > simpler
> > > > > > > because
> > > > > > > > > it
> > > > > > > > > > > > makes
> > > > > > > > > > > > >> it
> > > > > > > > > > > > >> > > > clear
> > > > > > > > > > > > >> > > > > > that
> > > > > > > > > > > > >> > > > > > > (offset, leaderEpoch) will be used to
> > > > identify
> > > > > > > > > > consumer's
> > > > > > > > > > > > >> > position
> > > > > > > > > > > > >> > > > in a
> > > > > > > > > > > > >> > > > > > > partition. And user only needs to
> handle
> > > > > > > > > > > > >> > > PartitionTruncationException
> > > > > > > > > > > > >> > > > > > from
> > > > > > > > > > > > >> > > > > > > the poll(). In comparison the first
> > option
> > > > > > seems a
> > > > > > > > bit
> > > > > > > > > > > > harder
> > > > > > > > > > > > >> to
> > > > > > > > > > > > >> > > use
> > > > > > > > > > > > >> > > > > > > because user have to also handle the
> > > > > > > > > > > > >> PartitionTruncationException
> > > > > > > > > > > > >> > > if
> > > > > > > > > > > > >> > > > > > > offsetsForLeaderEpochs() returns
> > different
> > > > > > offset
> > > > > > > > from
> > > > > > > > > > > > >> > > user-provided
> > > > > > > > > > > > >> > > > > > > offset. What do you think?
> > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > >> > > > > > > If we decide to add API seek(offset,
> > > > > > leaderEpoch),
> > > > > > > > > then
> > > > > > > > > > we
> > > > > > > > > > > > can
> > > > > > > > > > > > >> > > decide
> > > > > > > > > > > > >> > > > > > > whether and how to add API to
> translate
> > > > > (offset,
> > > > > > > > > > > > leaderEpoch)
> > > > > > > > > > > > >> to
> > > > > > > > > > > > >> > > > > offset.
> > > > > > > > > > > > >> > > > > > It
> > > > > > > > > > > > >> > > > > > > seems that this API will be needed by
> > > > advanced
> > > > > > > user
> > > > > > > > to
> > > > > > > > > > > don't
> > > > > > > > > > > > >> want
> > > > > > > > > > > > >> > > > auto
> > > > > > > > > > > > >> > > > > > > offset reset (so that it can be
> > notified)
> > > > but
> > > > > > > still
> > > > > > > > > > wants
> > > > > > > > > > > to
> > > > > > > > > > > > >> > reset
> > > > > > > > > > > > >> > > > > offset
> > > > > > > > > > > > >> > > > > > > to closest. For those users if
> probably
> > > > makes
> > > > > > > sense
> > > > > > > > to
> > > > > > > > > > > only
> > > > > > > > > > > > >> have
> > > > > > > > > > > > >> > > the
> > > > > > > > > > > > >> > > > > API
> > > > > > > > > > > > >> > > > > > in
> > > > > > > > > > > > >> > > > > > > AdminClient. offsetsForTimes() seems
> > like
> > > a
> > > > > > common
> > > > > > > > API
> > > > > > > > > > > that
> > > > > > > > > > > > >> will
> > > > > > > > > > > > >> > be
> > > > > > > > > > > > >> > > > > > needed
> > > > > > > > > > > > >> > > > > > > by user's of consumer in general, so
> it
> > > may
> > > > be
> > > > > > > more
> > > > > > > > > > > > >> reasonable to
> > > > > > > > > > > > >> > > > stay
> > > > > > > > > > > > >> > > > > in
> > > > > > > > > > > > >> > > > > > > the consumer API. I don't have a
> strong
> > > > > opinion
> > > > > > on
> > > > > > > > > > whether
> > > > > > > > > > > > >> > > > > > > offsetsForTimes() should be replaced
> by
> > > API
> > > > in
> > > > > > > > > > > AdminClient.
> > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > >> > > > > > > Though (offset, leaderEpoch) is needed
> > to
> > > > > > uniquely
> > > > > > > > > > > identify
> > > > > > > > > > > > a
> > > > > > > > > > > > >> > > message
> > > > > > > > > > > > >> > > > > in
> > > > > > > > > > > > >> > > > > > > general, it is only needed for
> advanced
> > > > users
> > > > > > who
> > > > > > > > has
> > > > > > > > > > > turned
> > > > > > > > > > > > >> on
> > > > > > > > > > > > >> > > > unclean
> > > > > > > > > > > > >> > > > > > > leader election, need to use seek(..),
> > and
> > > > > don't
> > > > > > > > want
> > > > > > > > > > auto
> > > > > > > > > > > > >> offset
> > > > > > > > > > > > >> > > > > reset.
> > > > > > > > > > > > >> > > > > > > Most other users probably just want to
> > > > enable
> > > > > > auto
> > > > > > > > > > offset
> > > > > > > > > > > > >> reset
> > > > > > > > > > > > >> > and
> > > > > > > > > > > > >> > > > > store
> > > > > > > > > > > > >> > > > > > > offset in Kafka. Thus we might want to
> > > keep
> > > > > the
> > > > > > > > > existing
> > > > > > > > > > > > >> > > offset-only
> > > > > > > > > > > > >> > > > > APIs
> > > > > > > > > > > > >> > > > > > > (e.g. seek() and position()) for most
> > > users
> > > > > > while
> > > > > > > > > adding
> > > > > > > > > > > new
> > > > > > > > > > > > >> APIs
> > > > > > > > > > > > >> > > for
> > > > > > > > > > > > >> > > > > > > advanced users. And yes, it seems that
> > we
> > > > need
> > > > > > new
> > > > > > > > > name
> > > > > > > > > > > for
> > > > > > > > > > > > >> > > > position().
> > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > >> > > > > > > Though I think we need new APIs to
> carry
> > > the
> > > > > new
> > > > > > > > > > > information
> > > > > > > > > > > > >> > (e.g.
> > > > > > > > > > > > >> > > > > > > leaderEpoch), I am not very sure how
> > that
> > > > > should
> > > > > > > > look
> > > > > > > > > > > like.
> > > > > > > > > > > > >> One
> > > > > > > > > > > > >> > > > > possible
> > > > > > > > > > > > >> > > > > > > option is those APIs in KIP-232.
> Another
> > > > > option
> > > > > > is
> > > > > > > > > > > something
> > > > > > > > > > > > >> like
> > > > > > > > > > > > >> > > > this:
> > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > >> > > > > > > `````
> > > > > > > > > > > > >> > > > > > > class OffsetEpochs {
> > > > > > > > > > > > >> > > > > > >   long offset;
> > > > > > > > > > > > >> > > > > > >   int leaderEpoch;
> > > > > > > > > > > > >> > > > > > >   int partitionEpoch;   // This may be
> > > > needed
> > > > > > > later
> > > > > > > > as
> > > > > > > > > > > > >> discussed
> > > > > > > > > > > > >> > in
> > > > > > > > > > > > >> > > > > > KIP-232
> > > > > > > > > > > > >> > > > > > >   ... // Hopefully these are all we
> need
> > > to
> > > > > > > identify
> > > > > > > > > > > message
> > > > > > > > > > > > >> in
> > > > > > > > > > > > >> > > > Kafka.
> > > > > > > > > > > > >> > > > > > But
> > > > > > > > > > > > >> > > > > > > if we need more then we can add new
> > fields
> > > > in
> > > > > > this
> > > > > > > > > > class.
> > > > > > > > > > > > >> > > > > > > }
> > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > >> > > > > > > OffsetEpochs
> > offsetEpochs(TopicPartition);
> > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > >> > > > > > > void seek(TopicPartition,
> OffsetEpochs);
> > > > > > > > > > > > >> > > > > > > ``````
> > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > >> > > > > > > Thanks,
> > > > > > > > > > > > >> > > > > > > Dong
> > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > >> > > > > > > On Fri, Jun 29, 2018 at 11:13 AM,
> Jason
> > > > > > Gustafson
> > > > > > > <
> > > > > > > > > > > > >> > > > ja...@confluent.io>
> > > > > > > > > > > > >> > > > > > > wrote:
> > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > >> > > > > > > > Hey Dong,
> > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > >> > > > > > > > Thanks for the feedback. The first
> > three
> > > > > > points
> > > > > > > > are
> > > > > > > > > > > easy:
> > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > >> > > > > > > > 1. Yes, we should be consistent.
> > > > > > > > > > > > >> > > > > > > > 2. Yes, I will add this.
> > > > > > > > > > > > >> > > > > > > > 3. Yes, I think we should document
> the
> > > > > changes
> > > > > > > to
> > > > > > > > > the
> > > > > > > > > > > > >> committed
> > > > > > > > > > > > >> > > > > offset
> > > > > > > > > > > > >> > > > >
> > >
> >
>

Reply via email to