Hi Jun,

I spent a little more time looking at the usage in WorkerSinkTask. I think
actually the initialization of the positions in the assignment callback is
not strictly necessary. We keep a map of the current consumed offsets which
is updated as we consume the data. As far as I can tell, we could either
skip the initialization and wait until the first fetched records come in or
we could use the committed() API to initialize positions. I think the root
of it is the argument Anna made previously. The leader epoch lets us track
the history of records that we have consumed. It is only useful when we
want to tell whether records we have consumed were lost. So getting the
leader epoch of an arbitrary position that was seeked doesn't really make
sense. The dependence on the consumed records is most explicit if we only
expose the leader epoch inside the fetched records. We might consider
adding a `lastConsumedLeaderEpoch` API to expose it directly, but I'm
inclined to leave that as potential future work.

A couple additional notes:

1. I've renamed OffsetAndMetadata.leaderEpoch to
OffsetAndMetadata.lastLeaderEpoch. In fact, the consumer doesn't know what
the leader epoch of the committed offset should be, so this just clarifies
the expected usage.

2. I decided to add a helper to ConsumerRecords to get the next offsets. We
would use this in WorkerSinkTask and external storage use cases to simplify
the commit logic. If we are consuming batch by batch, then we don't need
the message-level bookkeeping.

Thanks,
Jason

On Mon, Aug 6, 2018 at 10:28 AM, Jason Gustafson <ja...@confluent.io> wrote:

> Hey Jun,
>
> Thanks for the review. Responses below:
>
> 50. Yes, that is right. I clarified this in the KIP.
>
> 51. Yes, updated the KIP to mention.
>
> 52. Yeah, this was a reference to a previous iteration. I've fixed it.
>
> 53. I changed the API to use an `Optional<Integer>` for the leader epoch
> and added a note about the default value. Does that seem reasonable?
>
> 54. We discussed this above, but could not find a great option. The
> options are to add a new API (e.g. positionAndEpoch) or to rely on the user
> to get the epoch from the fetched records. We were leaning toward the
> latter, but I admit it was not fully satisfying. In this case, Connect
> would need to track the last consumed offsets manually instead of relying
> on the consumer. We also considered adding a convenience method to
> ConsumerRecords to get the offset to commit for all fetched partitions.
> This makes the additional bookkeeping pretty minimal. What do you think?
>
> 55. I clarified in the KIP. I was mainly thinking of situations where a
> previously valid offset becomes out of range.
>
> 56. Yeah, that's a bit annoying. I decided to keep LeaderEpoch as it is
> and use CurrentLeaderEpoch for both OffsetForLeaderEpoch and the Fetch
> APIs. I think Dong suggested this previously as well.
>
> 57. We could, but I'm not sure there's a strong reason to do so. I was
> thinking we would leave it around for convenience, but let me know if you
> think we should do otherwise.
>
>
> Thanks,
> Jason
>
>
> On Fri, Aug 3, 2018 at 4:49 PM, Jun Rao <j...@confluent.io> wrote:
>
>> Hi, Jason,
>>
>> Thanks for the updated KIP. Well thought-through. Just a few minor
>> comments
>> below.
>>
>> 50. For seek(TopicPartition partition, OffsetAndMetadata offset), I guess
>> under the cover, it will make OffsetsForLeaderEpoch request to determine
>> if
>> the seeked offset is still valid before fetching? If so, it will be useful
>> document this in the wiki.
>>
>> 51. Similarly, if the consumer fetch request gets FENCED_LEADER_EPOCH, I
>> guess the consumer will also make OffsetsForLeaderEpoch request to
>> determine if the last consumed offset is still valid before fetching? If
>> so, it will be useful document this in the wiki.
>>
>> 52. "If the consumer seeks to the middle of the log, for example, then we
>> will use the sentinel value -1 and the leader will skip the epoch
>> validation. " Is this true? If the consumer seeks using
>> seek(TopicPartition
>> partition, OffsetAndMetadata offset) and the seeked offset is valid, the
>> consumer can/should use the leaderEpoch in the cached metadata for
>> fetching?
>>
>> 53. OffsetAndMetadata. For backward compatibility, we need to support
>> constructing OffsetAndMetadata without providing leaderEpoch. Could we
>> define the default value of leaderEpoch if not provided and the semantics
>> of that (e.g., skipping the epoch validation)?
>>
>> 54. I saw the following code in WorkerSinkTask in Connect. It saves the
>> offset obtained through position(), which can be committed latter. Since
>> position() doesn't return the leaderEpoch, this can lead to committed
>> offset without leaderEpoch. Not sure how common this usage is, but what's
>> the recommendation for such users?
>>
>> private class HandleRebalance implements ConsumerRebalanceListener {
>>     @Override
>>     public void onPartitionsAssigned(Collection<TopicPartition>
>> partitions) {
>>         log.debug("{} Partitions assigned {}", WorkerSinkTask.this,
>> partitions);
>>         lastCommittedOffsets = new HashMap<>();
>>         currentOffsets = new HashMap<>();
>>         for (TopicPartition tp : partitions) {
>>             long pos = consumer.position(tp);
>>             lastCommittedOffsets.put(tp, new OffsetAndMetadata(pos));
>>
>> 55. "With this KIP, the only case in which this is possible is if the
>> consumer fetches from an offset earlier than the log start offset." Is
>> that
>> true? I guess a user could seek to a large offset without providing
>> leaderEpoch, which can cause the offset to be larger than the log end
>> offset during fetch?
>>
>> 56. In the schema for OffsetForLeaderEpochRequest, LeaderEpoch seems to be
>> an existing field. Is LeaderEpochQuery the new field? The name is not very
>> intuitive. It will be useful to document its meaning.
>>
>> 57. Should we deprecate the following api?
>> void seek(TopicPartition partition, long offset);
>>
>> Thanks,
>>
>> Jun
>>
>>
>> On Fri, Aug 3, 2018 at 9:32 AM, Jason Gustafson <ja...@confluent.io>
>> wrote:
>>
>> > Hey All,
>> >
>> > I think I've addressed all pending review. If there is no additional
>> > feedback, I'll plan to start a vote thread next week.
>> >
>> > Thanks,
>> > Jason
>> >
>> > On Tue, Jul 31, 2018 at 9:46 AM, Dong Lin <lindon...@gmail.com> wrote:
>> >
>> > > Hey Jason,
>> > >
>> > > Thanks for your reply. I will comment below.
>> > >
>> > > Regarding 1, we probably can not simply rename both to `LeaderEpoch`
>> > > because we already have a LeaderEpoch field in OffsetsForLeaderEpoch.
>> > >
>> > > Regarding 5, I am not strong on this. I agree with the two benefits of
>> > > having two error codes: 1) not having to refresh metadata when
>> consumer
>> > > sees UNKNOWN_LEADER_EPOCH and 2) provide more information in the log
>> for
>> > > debugging. Whether or not these two benefits are useful enough for one
>> > more
>> > > error code may be subjective. I will let you and others determine
>> this.
>> > >
>> > > Regarding 6, yeah overloading seek() looks good to me.
>> > >
>> > >
>> > > Thanks,
>> > > Dong
>> > >
>> > >
>> > > On Mon, Jul 30, 2018 at 9:33 AM, Jason Gustafson <ja...@confluent.io>
>> > > wrote:
>> > >
>> > > > Hey Dong,
>> > > >
>> > > > Thanks for the detailed review. Responses below:
>> > > >
>> > > > 1/2: Thanks for noticing the inconsistency. Would it be reasonable
>> to
>> > > > simply call it LeaderEpoch for both APIs?
>> > > >
>> > > > 3: I agree it should be a map. I will update.
>> > > >
>> > > > 4: Fair point. I think we should always be able to identify an
>> offset.
>> > > > Let's remove the Optional for now and reconsider if we find an
>> > unhandled
>> > > > case during implementation.
>> > > >
>> > > > 5: Yeah, I was thinking about this. The two error codes could be
>> > handled
>> > > > similarly, so we might merge them. Mainly I was thinking that it
>> will
>> > be
>> > > > useful for consumers/replicas to know whether they are ahead or
>> behind
>> > > the
>> > > > leader. For example, if a consumer sees UNKNOWN_LEADER_EPOCH, it
>> need
>> > not
>> > > > refresh metadata. Or if a replica sees a FENCED_LEADER_EPOCH error,
>> it
>> > > > could just stop fetching and await the LeaderAndIsr request that it
>> is
>> > > > missing. It probably also makes debugging a little bit easier. I
>> guess
>> > > I'm
>> > > > a bit inclined to keep both error codes, but I'm open to
>> > reconsideration
>> > > if
>> > > > you feel strongly. Another point to consider is whether we should
>> > > continue
>> > > > using NOT_LEADER_FOR_PARTITION if a follower receives an unexpected
>> > > fetch.
>> > > > The leader epoch would be different in this case so we could use
>> one of
>> > > the
>> > > > invalid epoch error codes instead since they contain more
>> information.
>> > > >
>> > > > 6: I agree the name is not ideal in that scenario. What if we
>> > overloaded
>> > > > `seek`?
>> > > >
>> > > > 7: Sure, I will mention this.
>> > > >
>> > > >
>> > > > Thanks,
>> > > > Jason
>> > > >
>> > > > On Fri, Jul 27, 2018 at 6:17 PM, Dong Lin <lindon...@gmail.com>
>> wrote:
>> > > >
>> > > > > Hey Jason,
>> > > > >
>> > > > > Thanks for the update! I agree with the current proposal overall.
>> I
>> > > have
>> > > > > some minor comments related to naming etc.
>> > > > >
>> > > > > 1) I am not strong and will just leave it here for discussion.
>> Would
>> > it
>> > > > be
>> > > > > better to rename "CurrentLeaderEpoch" to "ExpectedLeaderEpoch" for
>> > the
>> > > > new
>> > > > > field in the OffsetsForLeaderEpochRequest? The reason is that
>> > > > > "CurrentLeaderEpoch" may not necessarily be true current leader
>> epoch
>> > > if
>> > > > > the consumer has stale metadata. "ExpectedLeaderEpoch" shows that
>> > this
>> > > > > epoch is what consumer expects on the broker which may or may not
>> be
>> > > the
>> > > > > true value.
>> > > > >
>> > > > > 2) Currently we add the field "LeaderEpoch" to FetchRequest and
>> the
>> > > field
>> > > > > "CurrentLeaderEpoch" to OffsetsForLeaderEpochRequest. Given that
>> both
>> > > > > fields are compared with the leaderEpoch in the broker, would it
>> be
>> > > > better
>> > > > > to give them the same name?
>> > > > >
>> > > > > 3) Currently LogTruncationException.truncationOffset() returns
>> > > > > Optional<OffsetAndMetadata> to user. Should it return
>> > > > > Optional<Map<TopicPartition, OffsetAndMetadata>> to handle the
>> > scenario
>> > > > > where leaderEpoch of multiple partitions are different from the
>> > > > leaderEpoch
>> > > > > in the broker?
>> > > > >
>> > > > > 4) Currently LogTruncationException.truncationOffset() returns an
>> > > > Optional
>> > > > > value. Could you explain a bit more when it will return
>> > > > Optional.empty()? I
>> > > > > am trying to understand whether it is simpler and reasonable to
>> > > > > replace Optional.empty()
>> > > > > with OffsetMetadata(offset=last_fetched_offset, leaderEpoch=-1).
>> > > > >
>> > > > > 5) Do we also need to add a new retriable exception for error code
>> > > > > FENCED_LEADER_EPOCH? And do we need to define both
>> > FENCED_LEADER_EPOCH
>> > > > > and UNKNOWN_LEADER_EPOCH.
>> > > > > It seems that the current KIP uses these two error codes in the
>> same
>> > > way
>> > > > > and the exception for these two error codes is not exposed to the
>> > user.
>> > > > > Maybe we should combine them into one error, e.g.
>> > INVALID_LEADER_EPOCH?
>> > > > >
>> > > > > 6) For users who has turned off auto offset reset, when
>> > consumer.poll()
>> > > > > throw LogTruncationException, it seems that user will most likely
>> > call
>> > > > > seekToCommitted(offset,
>> > > > > leaderEpoch) where offset and leaderEpoch are obtained from
>> > > > > LogTruncationException.truncationOffset(). In this case, the
>> offset
>> > > used
>> > > > > here is not committed, which is inconsistent from the method name
>> > > > > seekToCommitted(...). Would it be better to rename the method to
>> e.g.
>> > > > > seekToLastConsumedMessage()?
>> > > > >
>> > > > > 7) Per point 3 in Jun's comment, would it be useful to explicitly
>> > > specify
>> > > > > in the KIP that we will log the truncation event if user has
>> turned
>> > on
>> > > > auto
>> > > > > offset reset policy?
>> > > > >
>> > > > >
>> > > > > Thanks,
>> > > > > Dong
>> > > > >
>> > > > >
>> > > > > On Fri, Jul 27, 2018 at 12:39 PM, Jason Gustafson <
>> > ja...@confluent.io>
>> > > > > wrote:
>> > > > >
>> > > > > > Thanks Anna, you are right on both points. I updated the KIP.
>> > > > > >
>> > > > > > -Jason
>> > > > > >
>> > > > > > On Thu, Jul 26, 2018 at 2:08 PM, Anna Povzner <
>> a...@confluent.io>
>> > > > wrote:
>> > > > > >
>> > > > > > > Hi Jason,
>> > > > > > >
>> > > > > > > Thanks for the update. I agree with the current proposal.
>> > > > > > >
>> > > > > > > Two minor comments:
>> > > > > > > 1) In “API Changes” section, first paragraph says that “users
>> can
>> > > > catch
>> > > > > > the
>> > > > > > > more specific exception type and use the new `seekToNearest()`
>> > API
>> > > > > > defined
>> > > > > > > below.”. Since LogTruncationException “will include the
>> > partitions
>> > > > that
>> > > > > > > were truncated and the offset of divergence”., shouldn’t the
>> > client
>> > > > use
>> > > > > > > seek(offset) to seek to the offset of divergence in response
>> to
>> > the
>> > > > > > > exception?
>> > > > > > > 2) In “Protocol Changes” section, OffsetsForLeaderEpoch
>> > subsection
>> > > > says
>> > > > > > > “Note
>> > > > > > > that consumers will send a sentinel value (-1) for the current
>> > > epoch
>> > > > > and
>> > > > > > > the broker will simply disregard that validation.”. Is that
>> still
>> > > > true
>> > > > > > with
>> > > > > > > MetadataResponse containing leader epoch?
>> > > > > > >
>> > > > > > > Thanks,
>> > > > > > > Anna
>> > > > > > >
>> > > > > > > On Wed, Jul 25, 2018 at 1:44 PM Jason Gustafson <
>> > > ja...@confluent.io>
>> > > > > > > wrote:
>> > > > > > >
>> > > > > > > > Hi All,
>> > > > > > > >
>> > > > > > > > I have made some updates to the KIP. As many of you know, a
>> > side
>> > > > > > project
>> > > > > > > of
>> > > > > > > > mine has been specifying the Kafka replication protocol in
>> TLA.
>> > > You
>> > > > > can
>> > > > > > > > check out the code here if you are interested:
>> > > > > > > > https://github.com/hachikuji/kafka-specification. In
>> addition
>> > to
>> > > > > > > > uncovering
>> > > > > > > > a couple unknown bugs in the replication protocol (e.g.
>> > > > > > > > https://issues.apache.org/jira/browse/KAFKA-7128), this has
>> > > helped
>> > > > > me
>> > > > > > > > validate the behavior in this KIP. In fact, the original
>> > version
>> > > I
>> > > > > > > proposed
>> > > > > > > > had a weakness. I initially suggested letting the leader
>> > validate
>> > > > the
>> > > > > > > > expected epoch at the fetch offset. This made sense for the
>> > > > consumer
>> > > > > in
>> > > > > > > the
>> > > > > > > > handling of unclean leader election, but it was not strong
>> > enough
>> > > > to
>> > > > > > > > protect the follower in all cases. In order to make
>> advancement
>> > > of
>> > > > > the
>> > > > > > > high
>> > > > > > > > watermark safe, for example, the leader actually needs to be
>> > sure
>> > > > > that
>> > > > > > > > every follower in the ISR matches its own epoch.
>> > > > > > > >
>> > > > > > > > I attempted to fix this problem by treating the epoch in the
>> > > fetch
>> > > > > > > request
>> > > > > > > > slightly differently for consumers and followers. For
>> > consumers,
>> > > it
>> > > > > > would
>> > > > > > > > be the expected epoch of the record at the fetch offset, and
>> > the
>> > > > > leader
>> > > > > > > > would raise a LOG_TRUNCATION error if the expectation
>> failed.
>> > For
>> > > > > > > > followers, it would be the current epoch and the leader
>> would
>> > > > require
>> > > > > > > that
>> > > > > > > > it match its own epoch. This was unsatisfying both because
>> of
>> > the
>> > > > > > > > inconsistency in behavior and because the consumer was left
>> > with
>> > > > the
>> > > > > > > weaker
>> > > > > > > > fencing that we already knew was insufficient for the
>> replicas.
>> > > > > > > Ultimately
>> > > > > > > > I decided that we should make the behavior consistent and
>> that
>> > > > meant
>> > > > > > that
>> > > > > > > > the consumer needed to act more like a following replica.
>> > Instead
>> > > > of
>> > > > > > > > checking for truncation while fetching, the consumer should
>> > check
>> > > > for
>> > > > > > > > truncation after leader changes. After checking for
>> truncation,
>> > > the
>> > > > > > > > consumer can then use the current epoch when fetching and
>> get
>> > the
>> > > > > > > stronger
>> > > > > > > > protection that it provides. What this means is that the
>> > Metadata
>> > > > API
>> > > > > > > must
>> > > > > > > > include the current leader epoch. Given the problems we have
>> > had
>> > > > > around
>> > > > > > > > stale metadata and how challenging they have been to debug,
>> I'm
>> > > > > > convinced
>> > > > > > > > that this is a good idea in any case and it resolves the
>> > > > inconsistent
>> > > > > > > > behavior in the Fetch API. The downside is that there will
>> be
>> > > some
>> > > > > > > > additional overhead upon leader changes, but I don't think
>> it
>> > is
>> > > a
>> > > > > > major
>> > > > > > > > concern since leader changes are rare and the
>> > > OffsetForLeaderEpoch
>> > > > > > > request
>> > > > > > > > is cheap.
>> > > > > > > >
>> > > > > > > > This approach leaves the door open for some interesting
>> follow
>> > up
>> > > > > > > > improvements. For example, now that we have the leader
>> epoch in
>> > > the
>> > > > > > > > Metadata request, we can implement similar fencing for the
>> > > Produce
>> > > > > API.
>> > > > > > > And
>> > > > > > > > now that the consumer can reason about truncation, we could
>> > > > consider
>> > > > > > > having
>> > > > > > > > a configuration to expose records beyond the high watermark.
>> > This
>> > > > > would
>> > > > > > > let
>> > > > > > > > users trade lower end-to-end latency for weaker durability
>> > > > semantics.
>> > > > > > It
>> > > > > > > is
>> > > > > > > > sort of like having an acks=0 option for the consumer.
>> Neither
>> > of
>> > > > > these
>> > > > > > > > options are included in this KIP, I am just mentioning them
>> as
>> > > > > > potential
>> > > > > > > > work for the future.
>> > > > > > > >
>> > > > > > > > Finally, based on the discussion in this thread, I have
>> added
>> > the
>> > > > > > > > seekToCommitted API for the consumer. Please take a look and
>> > let
>> > > me
>> > > > > > know
>> > > > > > > > what you think.
>> > > > > > > >
>> > > > > > > > Thanks,
>> > > > > > > > Jason
>> > > > > > > >
>> > > > > > > > On Fri, Jul 20, 2018 at 2:34 PM, Guozhang Wang <
>> > > wangg...@gmail.com
>> > > > >
>> > > > > > > wrote:
>> > > > > > > >
>> > > > > > > > > Hi Jason,
>> > > > > > > > >
>> > > > > > > > > The proposed API seems reasonable to me too. Could you
>> please
>> > > > also
>> > > > > > > update
>> > > > > > > > > the wiki page (
>> > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > > > > > > > > 320%3A+Allow+fetchers+to+detec
>> t+and+handle+log+truncation)
>> > > > > > > > > with a section say "workflow" on how the proposed API
>> will be
>> > > > > co-used
>> > > > > > > > with
>> > > > > > > > > others to:
>> > > > > > > > >
>> > > > > > > > > 1. consumer callers handling a LogTruncationException.
>> > > > > > > > > 2. consumer internals for handling a retriable
>> > > > > > > > UnknownLeaderEpochException.
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > > Guozhang
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > > On Tue, Jul 17, 2018 at 10:23 AM, Anna Povzner <
>> > > > a...@confluent.io>
>> > > > > > > > wrote:
>> > > > > > > > >
>> > > > > > > > > > Hi Jason,
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > > I also like your proposal and agree that
>> > > > > > > > KafkaConsumer#seekToCommitted()
>> > > > > > > > > > is
>> > > > > > > > > > more intuitive as a way to initialize both consumer's
>> > > position
>> > > > > and
>> > > > > > > its
>> > > > > > > > > > fetch state.
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > > My understanding that KafkaConsumer#seekToCommitted()
>> is
>> > > > purely
>> > > > > > for
>> > > > > > > > > > clients
>> > > > > > > > > > who store their offsets externally, right? And we are
>> still
>> > > > going
>> > > > > > to
>> > > > > > > > > > add KafkaConsumer#findOffsets()
>> > > > > > > > > > in this KIP as we discussed, so that the client can
>> handle
>> > > > > > > > > > LogTruncationException?
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > > Thanks,
>> > > > > > > > > >
>> > > > > > > > > > Anna
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > > On Thu, Jul 12, 2018 at 3:57 PM Dong Lin <
>> > > lindon...@gmail.com>
>> > > > > > > wrote:
>> > > > > > > > > >
>> > > > > > > > > > > Hey Jason,
>> > > > > > > > > > >
>> > > > > > > > > > > It is a great summary. The solution sounds good. I
>> might
>> > > have
>> > > > > > minor
>> > > > > > > > > > > comments regarding the method name. But we can discuss
>> > that
>> > > > > minor
>> > > > > > > > > points
>> > > > > > > > > > > later after we reach consensus on the high level API.
>> > > > > > > > > > >
>> > > > > > > > > > > Thanks,
>> > > > > > > > > > > Dong
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > > On Thu, Jul 12, 2018 at 11:41 AM, Jason Gustafson <
>> > > > > > > > ja...@confluent.io>
>> > > > > > > > > > > wrote:
>> > > > > > > > > > >
>> > > > > > > > > > > > Hey Anna and Dong,
>> > > > > > > > > > > >
>> > > > > > > > > > > > Thanks a lot for the great discussion. I've been
>> > hanging
>> > > > > back a
>> > > > > > > bit
>> > > > > > > > > > > because
>> > > > > > > > > > > > honestly the best option hasn't seemed clear. I
>> agree
>> > > with
>> > > > > > Anna's
>> > > > > > > > > > general
>> > > > > > > > > > > > observation that there is a distinction between the
>> > > > position
>> > > > > of
>> > > > > > > the
>> > > > > > > > > > > > consumer and its fetch state up to that position. If
>> > you
>> > > > > think
>> > > > > > > > about
>> > > > > > > > > > it,
>> > > > > > > > > > > a
>> > > > > > > > > > > > committed offset actually represents both of these.
>> The
>> > > > > > metadata
>> > > > > > > is
>> > > > > > > > > > used
>> > > > > > > > > > > to
>> > > > > > > > > > > > initialize the state of the consumer application and
>> > the
>> > > > > offset
>> > > > > > > > > > > initializes
>> > > > > > > > > > > > the position. Additionally, we are extending the
>> offset
>> > > > > commit
>> > > > > > in
>> > > > > > > > > this
>> > > > > > > > > > > KIP
>> > > > > > > > > > > > to also include the last epoch fetched by the
>> consumer,
>> > > > which
>> > > > > > is
>> > > > > > > > used
>> > > > > > > > > > to
>> > > > > > > > > > > > initialize the internal fetch state. Of course if
>> you
>> > do
>> > > an
>> > > > > > > > arbitrary
>> > > > > > > > > > > > `seek` and immediately commit offsets, then there
>> won't
>> > > be
>> > > > a
>> > > > > > last
>> > > > > > > > > epoch
>> > > > > > > > > > > to
>> > > > > > > > > > > > commit. This seems intuitive since there is no fetch
>> > > state
>> > > > in
>> > > > > > > this
>> > > > > > > > > > case.
>> > > > > > > > > > > We
>> > > > > > > > > > > > only commit fetch state when we have it.
>> > > > > > > > > > > >
>> > > > > > > > > > > > So if we think about a committed offset as
>> initializing
>> > > > both
>> > > > > > the
>> > > > > > > > > > > consumer's
>> > > > > > > > > > > > position and its fetch state, then the gap in the
>> API
>> > is
>> > > > > > > evidently
>> > > > > > > > > that
>> > > > > > > > > > > we
>> > > > > > > > > > > > don't have a way to initialize the consumer to a
>> > > committed
>> > > > > > > offset.
>> > > > > > > > We
>> > > > > > > > > > do
>> > > > > > > > > > > it
>> > > > > > > > > > > > implicitly of course for offsets stored in Kafka,
>> but
>> > > since
>> > > > > > > > external
>> > > > > > > > > > > > storage is a use case we support, then we should
>> have
>> > an
>> > > > > > explicit
>> > > > > > > > API
>> > > > > > > > > > as
>> > > > > > > > > > > > well. Perhaps something like this:
>> > > > > > > > > > > >
>> > > > > > > > > > > > seekToCommitted(TopicPartition, OffsetAndMetadata)
>> > > > > > > > > > > >
>> > > > > > > > > > > > In this KIP, we are proposing to allow the
>> > > > > `OffsetAndMetadata`
>> > > > > > > > object
>> > > > > > > > > > to
>> > > > > > > > > > > > include the leader epoch, so I think this would have
>> > the
>> > > > same
>> > > > > > > > effect
>> > > > > > > > > as
>> > > > > > > > > > > > Anna's suggested `seekToRecord`. But perhaps it is a
>> > more
>> > > > > > natural
>> > > > > > > > fit
>> > > > > > > > > > > given
>> > > > > > > > > > > > the current API? Furthermore, if we find a need for
>> > > > > additional
>> > > > > > > > > metadata
>> > > > > > > > > > > in
>> > > > > > > > > > > > the offset commit API in the future, then we will
>> just
>> > > need
>> > > > > to
>> > > > > > > > modify
>> > > > > > > > > > the
>> > > > > > > > > > > > `OffsetAndMetadata` object and we will not need a
>> new
>> > > > `seek`
>> > > > > > API.
>> > > > > > > > > > > >
>> > > > > > > > > > > > With this approach, I think then we can leave the
>> > > > `position`
>> > > > > > API
>> > > > > > > as
>> > > > > > > > > it
>> > > > > > > > > > > is.
>> > > > > > > > > > > > The position of the consumer is still just the next
>> > > > expected
>> > > > > > > fetch
>> > > > > > > > > > > offset.
>> > > > > > > > > > > > If a user needs to record additional state based on
>> > > > previous
>> > > > > > > fetch
>> > > > > > > > > > > > progress, then they would use the result of the
>> > previous
>> > > > > fetch
>> > > > > > to
>> > > > > > > > > > obtain
>> > > > > > > > > > > > it. This makes the dependence on fetch progress
>> > > explicit. I
>> > > > > > think
>> > > > > > > > we
>> > > > > > > > > > > could
>> > > > > > > > > > > > make this a little more convenience with a helper in
>> > the
>> > > > > > > > > > > `ConsumerRecords`
>> > > > > > > > > > > > object, but I think that's more of a nice-to-have.
>> > > > > > > > > > > >
>> > > > > > > > > > > > Thoughts?
>> > > > > > > > > > > >
>> > > > > > > > > > > > By the way, I have been iterating a little bit on
>> the
>> > > > replica
>> > > > > > > side
>> > > > > > > > of
>> > > > > > > > > > > this
>> > > > > > > > > > > > KIP. My initial proposal in fact did not have strong
>> > > enough
>> > > > > > > fencing
>> > > > > > > > > to
>> > > > > > > > > > > > protect all of the edge cases. I believe the current
>> > > > proposal
>> > > > > > > fixes
>> > > > > > > > > the
>> > > > > > > > > > > > problems, but I am still verifying the model.
>> > > > > > > > > > > >
>> > > > > > > > > > > > Thanks,
>> > > > > > > > > > > > Jason
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > > > On Wed, Jul 11, 2018 at 10:45 AM, Dong Lin <
>> > > > > > lindon...@gmail.com>
>> > > > > > > > > > wrote:
>> > > > > > > > > > > >
>> > > > > > > > > > > > > Hey Anna,
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > Thanks much for the explanation. Approach 1 also
>> > sounds
>> > > > > good
>> > > > > > to
>> > > > > > > > > me. I
>> > > > > > > > > > > > think
>> > > > > > > > > > > > > findOffsets() is useful for users who don't use
>> > > automatic
>> > > > > > > offset
>> > > > > > > > > > reset
>> > > > > > > > > > > > > policy.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > Just one more question. Since users who store
>> offsets
>> > > > > > > externally
>> > > > > > > > > need
>> > > > > > > > > > > to
>> > > > > > > > > > > > > provide leaderEpoch to findOffsets(...), do we
>> need
>> > an
>> > > > > extra
>> > > > > > > API
>> > > > > > > > > for
>> > > > > > > > > > > user
>> > > > > > > > > > > > > to get both offset and leaderEpoch, e.g.
>> > > > recordPosition()?
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > Thanks,
>> > > > > > > > > > > > > Dong
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > On Wed, Jul 11, 2018 at 10:12 AM, Anna Povzner <
>> > > > > > > > a...@confluent.io>
>> > > > > > > > > > > > wrote:
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > > Hi Dong,
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > What I called “not covering all use cases” is
>> what
>> > > you
>> > > > > call
>> > > > > > > > > > > best-effort
>> > > > > > > > > > > > > > (not guaranteeing some corner cases). I think we
>> > are
>> > > on
>> > > > > the
>> > > > > > > > same
>> > > > > > > > > > page
>> > > > > > > > > > > > > here.
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > I wanted to be clear in the API whether the
>> > consumer
>> > > > > seeks
>> > > > > > > to a
>> > > > > > > > > > > > position
>> > > > > > > > > > > > > > (offset) or to a record (offset, leader epoch).
>> The
>> > > > only
>> > > > > > > > use-case
>> > > > > > > > > > of
>> > > > > > > > > > > > > > seeking to a record is seeking to a committed
>> > offset
>> > > > for
>> > > > > a
>> > > > > > > user
>> > > > > > > > > who
>> > > > > > > > > > > > > stores
>> > > > > > > > > > > > > > committed offsets externally. (Unless users find
>> > some
>> > > > > other
>> > > > > > > > > reason
>> > > > > > > > > > to
>> > > > > > > > > > > > > seek
>> > > > > > > > > > > > > > to a record.) I thought it was possible to
>> provide
>> > > this
>> > > > > > > > > > functionality
>> > > > > > > > > > > > > with
>> > > > > > > > > > > > > > findOffset(offset, leader epoch) followed by a
>> > > > > > seek(offset).
>> > > > > > > > > > However,
>> > > > > > > > > > > > you
>> > > > > > > > > > > > > > are right that this will not handle the race
>> > > condition
>> > > > > > where
>> > > > > > > > > > > > > non-divergent
>> > > > > > > > > > > > > > offset found by findOffset() could change again
>> > > before
>> > > > > the
>> > > > > > > > > consumer
>> > > > > > > > > > > > does
>> > > > > > > > > > > > > > the first fetch.
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > Regarding position() — if we add position that
>> > > returns
>> > > > > > > (offset,
>> > > > > > > > > > > leader
>> > > > > > > > > > > > > > epoch), this is specifically a position after a
>> > > record
>> > > > > that
>> > > > > > > was
>> > > > > > > > > > > > actually
>> > > > > > > > > > > > > > consumed or position of a committed record. In
>> > which
>> > > > > case,
>> > > > > > I
>> > > > > > > > > still
>> > > > > > > > > > > > think
>> > > > > > > > > > > > > > it’s cleaner to get a record position of
>> consumed
>> > > > message
>> > > > > > > from
>> > > > > > > > a
>> > > > > > > > > > new
>> > > > > > > > > > > > > helper
>> > > > > > > > > > > > > > method in ConsumerRecords() or from committed
>> > > offsets.
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > I think all the use-cases could be then covered
>> > with:
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > (Approach 1)
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > seekToRecord(offset, leaderEpoch) — this will
>> just
>> > > > > > > > initialize/set
>> > > > > > > > > > the
>> > > > > > > > > > > > > > consumer state;
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > findOffsets(offset, leaderEpoch) returns
>> {offset,
>> > > > > > > leaderEpoch}
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > If we agree that the race condition is also a
>> > corner
>> > > > > case,
>> > > > > > > > then I
>> > > > > > > > > > > think
>> > > > > > > > > > > > > we
>> > > > > > > > > > > > > > can cover use-cases with:
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > (Approach 2)
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > findOffsets(offset, leaderEpoch) returns offset
>> —
>> > we
>> > > > > still
>> > > > > > > want
>> > > > > > > > > > > leader
>> > > > > > > > > > > > > > epoch as a parameter for the users who store
>> their
>> > > > > > committed
>> > > > > > > > > > offsets
>> > > > > > > > > > > > > > externally.
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > I am actually now leaning more to approach 1,
>> since
>> > > it
>> > > > is
>> > > > > > > more
>> > > > > > > > > > > > explicit,
>> > > > > > > > > > > > > > and maybe there are more use cases for it.
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > Thanks,
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > Anna
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > On Tue, Jul 10, 2018 at 3:47 PM Dong Lin <
>> > > > > > > lindon...@gmail.com>
>> > > > > > > > > > > wrote:
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > Hey Anna,
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > Thanks for the comment. To answer your
>> question,
>> > it
>> > > > > seems
>> > > > > > > > that
>> > > > > > > > > we
>> > > > > > > > > > > can
>> > > > > > > > > > > > > > cover
>> > > > > > > > > > > > > > > all case in this KIP. As stated in "Consumer
>> > > > Handling"
>> > > > > > > > section,
>> > > > > > > > > > > > KIP-101
>> > > > > > > > > > > > > > > based approach will be used to derive the
>> > > truncation
>> > > > > > offset
>> > > > > > > > > from
>> > > > > > > > > > > the
>> > > > > > > > > > > > > > > 2-tuple (offset, leaderEpoch). This approach
>> is
>> > > best
>> > > > > > effort
>> > > > > > > > and
>> > > > > > > > > > it
>> > > > > > > > > > > is
>> > > > > > > > > > > > > > > inaccurate only in very rare scenarios (as
>> > > described
>> > > > in
>> > > > > > > > > KIP-279).
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > By using seek(offset, leaderEpoch), consumer
>> will
>> > > > still
>> > > > > > be
>> > > > > > > > able
>> > > > > > > > > > to
>> > > > > > > > > > > > > follow
>> > > > > > > > > > > > > > > this best-effort approach to detect log
>> > truncation
>> > > > and
>> > > > > > > > > determine
>> > > > > > > > > > > the
>> > > > > > > > > > > > > > > truncation offset. On the other hand, if we
>> use
>> > > > > > > seek(offset),
>> > > > > > > > > > > > consumer
>> > > > > > > > > > > > > > will
>> > > > > > > > > > > > > > > not detect log truncation in some cases which
>> > > weakens
>> > > > > the
>> > > > > > > > > > guarantee
>> > > > > > > > > > > > of
>> > > > > > > > > > > > > > this
>> > > > > > > > > > > > > > > KIP. Does this make sense?
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > Thanks,
>> > > > > > > > > > > > > > > Dong
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > On Tue, Jul 10, 2018 at 3:14 PM, Anna Povzner
>> <
>> > > > > > > > > a...@confluent.io
>> > > > > > > > > > >
>> > > > > > > > > > > > > wrote:
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > Sorry, I hit "send" before finishing.
>> > > Continuing...
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > 2) Hiding most of the consumer handling log
>> > > > > truncation
>> > > > > > > > logic
>> > > > > > > > > > with
>> > > > > > > > > > > > > > minimal
>> > > > > > > > > > > > > > > > exposure in KafkaConsumer API.  I was
>> proposing
>> > > > this
>> > > > > > > path.
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > Before answering your specific questions… I
>> > want
>> > > to
>> > > > > > > answer
>> > > > > > > > to
>> > > > > > > > > > > your
>> > > > > > > > > > > > > > > comment
>> > > > > > > > > > > > > > > > “In general, maybe we should discuss the
>> final
>> > > > > solution
>> > > > > > > > that
>> > > > > > > > > > > covers
>> > > > > > > > > > > > > all
>> > > > > > > > > > > > > > > > cases?”. With current KIP, we don’t cover
>> all
>> > > cases
>> > > > > of
>> > > > > > > > > consumer
>> > > > > > > > > > > > > > detecting
>> > > > > > > > > > > > > > > > log truncation because the KIP proposes a
>> > leader
>> > > > > epoch
>> > > > > > > > cache
>> > > > > > > > > in
>> > > > > > > > > > > > > > consumer
>> > > > > > > > > > > > > > > > that does not persist across restarts.
>> Plus, we
>> > > > only
>> > > > > > > store
>> > > > > > > > > last
>> > > > > > > > > > > > > > committed
>> > > > > > > > > > > > > > > > offset (either internally or users can store
>> > > > > > externally).
>> > > > > > > > > This
>> > > > > > > > > > > has
>> > > > > > > > > > > > a
>> > > > > > > > > > > > > > > > limitation that the consumer will not
>> always be
>> > > > able
>> > > > > to
>> > > > > > > > find
>> > > > > > > > > > > point
>> > > > > > > > > > > > of
>> > > > > > > > > > > > > > > > truncation just because we have a limited
>> > history
>> > > > > (just
>> > > > > > > one
>> > > > > > > > > > data
>> > > > > > > > > > > > > > point).
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > So, maybe we should first agree on whether
>> we
>> > > > accept
>> > > > > > that
>> > > > > > > > > > storing
>> > > > > > > > > > > > > last
>> > > > > > > > > > > > > > > > committed offset/leader epoch has a
>> limitation
>> > > that
>> > > > > the
>> > > > > > > > > > consumer
>> > > > > > > > > > > > will
>> > > > > > > > > > > > > > not
>> > > > > > > > > > > > > > > > be able to detect log truncation in all
>> cases?
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > Thanks,
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > Anna
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > On Tue, Jul 10, 2018 at 2:20 PM Anna
>> Povzner <
>> > > > > > > > > > a...@confluent.io>
>> > > > > > > > > > > > > > wrote:
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > Hi Dong,
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > Thanks for the follow up! I finally have
>> much
>> > > > more
>> > > > > > > clear
>> > > > > > > > > > > > > > understanding
>> > > > > > > > > > > > > > > of
>> > > > > > > > > > > > > > > > > where you are coming from.
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > You are right. The success of
>> > > > > findOffsets()/finding a
>> > > > > > > > point
>> > > > > > > > > > of
>> > > > > > > > > > > > > > > > > non-divergence depends on whether we have
>> > > enough
>> > > > > > > entries
>> > > > > > > > in
>> > > > > > > > > > the
>> > > > > > > > > > > > > > > > consumer's
>> > > > > > > > > > > > > > > > > leader epoch cache. However, I think this
>> is
>> > a
>> > > > > > > > fundamental
>> > > > > > > > > > > > > limitation
>> > > > > > > > > > > > > > > of
>> > > > > > > > > > > > > > > > > having a leader epoch cache that does not
>> > > persist
>> > > > > > > across
>> > > > > > > > > > > consumer
>> > > > > > > > > > > > > > > > restarts.
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > If we consider the general case where
>> > consumer
>> > > > may
>> > > > > or
>> > > > > > > may
>> > > > > > > > > not
>> > > > > > > > > > > > have
>> > > > > > > > > > > > > > this
>> > > > > > > > > > > > > > > > > cache, then I see two paths:
>> > > > > > > > > > > > > > > > > 1) Letting the user to track the leader
>> epoch
>> > > > > history
>> > > > > > > > > > > externally,
>> > > > > > > > > > > > > and
>> > > > > > > > > > > > > > > > have
>> > > > > > > > > > > > > > > > > more exposure to leader epoch and finding
>> > point
>> > > > of
>> > > > > > > > > > > non-divergence
>> > > > > > > > > > > > > in
>> > > > > > > > > > > > > > > > > KafkaConsumer API. I understand this is
>> the
>> > > case
>> > > > > you
>> > > > > > > were
>> > > > > > > > > > > talking
>> > > > > > > > > > > > > > > about.
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > On Tue, Jul 10, 2018 at 12:16 PM Dong Lin
>> <
>> > > > > > > > > > lindon...@gmail.com
>> > > > > > > > > > > >
>> > > > > > > > > > > > > > wrote:
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > >> Hey Anna,
>> > > > > > > > > > > > > > > > >>
>> > > > > > > > > > > > > > > > >> Thanks much for your detailed explanation
>> > and
>> > > > > > example!
>> > > > > > > > It
>> > > > > > > > > > does
>> > > > > > > > > > > > > help
>> > > > > > > > > > > > > > me
>> > > > > > > > > > > > > > > > >> understand the difference between our
>> > > > > understanding.
>> > > > > > > > > > > > > > > > >>
>> > > > > > > > > > > > > > > > >> So it seems that the solution based on
>> > > > > findOffsets()
>> > > > > > > > > > currently
>> > > > > > > > > > > > > > focuses
>> > > > > > > > > > > > > > > > >> mainly on the scenario that consumer has
>> > > cached
>> > > > > > > > > leaderEpoch
>> > > > > > > > > > ->
>> > > > > > > > > > > > > > offset
>> > > > > > > > > > > > > > > > >> mapping whereas I was thinking about the
>> > > general
>> > > > > > case
>> > > > > > > > > where
>> > > > > > > > > > > > > consumer
>> > > > > > > > > > > > > > > may
>> > > > > > > > > > > > > > > > >> or
>> > > > > > > > > > > > > > > > >> may not have this cache. I guess that is
>> why
>> > > we
>> > > > > have
>> > > > > > > > > > different
>> > > > > > > > > > > > > > > > >> understanding here. I have some comments
>> > > below.
>> > > > > > > > > > > > > > > > >>
>> > > > > > > > > > > > > > > > >>
>> > > > > > > > > > > > > > > > >> 3) The proposed solution using
>> > > > findOffsets(offset,
>> > > > > > > > > > > leaderEpoch)
>> > > > > > > > > > > > > > > followed
>> > > > > > > > > > > > > > > > >> by
>> > > > > > > > > > > > > > > > >> seek(offset) works if consumer has the
>> > cached
>> > > > > > > > leaderEpoch
>> > > > > > > > > ->
>> > > > > > > > > > > > > offset
>> > > > > > > > > > > > > > > > >> mapping. But if we assume consumer has
>> this
>> > > > cache,
>> > > > > > do
>> > > > > > > we
>> > > > > > > > > > need
>> > > > > > > > > > > to
>> > > > > > > > > > > > > > have
>> > > > > > > > > > > > > > > > >> leaderEpoch in the findOffsets(...)?
>> > > > Intuitively,
>> > > > > > the
>> > > > > > > > > > > > > > > > findOffsets(offset)
>> > > > > > > > > > > > > > > > >> can also derive the leaderEpoch using
>> offset
>> > > > just
>> > > > > > like
>> > > > > > > > the
>> > > > > > > > > > > > > proposed
>> > > > > > > > > > > > > > > > >> solution does with seek(offset).
>> > > > > > > > > > > > > > > > >>
>> > > > > > > > > > > > > > > > >>
>> > > > > > > > > > > > > > > > >> 4) If consumer does not have cached
>> > > leaderEpoch
>> > > > ->
>> > > > > > > > offset
>> > > > > > > > > > > > mapping,
>> > > > > > > > > > > > > > > which
>> > > > > > > > > > > > > > > > >> is
>> > > > > > > > > > > > > > > > >> the case if consumer is restarted on a
>> new
>> > > > > machine,
>> > > > > > > then
>> > > > > > > > > it
>> > > > > > > > > > is
>> > > > > > > > > > > > not
>> > > > > > > > > > > > > > > clear
>> > > > > > > > > > > > > > > > >> what leaderEpoch would be included in the
>> > > > > > FetchRequest
>> > > > > > > > if
>> > > > > > > > > > > > consumer
>> > > > > > > > > > > > > > > does
>> > > > > > > > > > > > > > > > >> seek(offset). This is the case that
>> > motivates
>> > > > the
>> > > > > > > first
>> > > > > > > > > > > question
>> > > > > > > > > > > > > of
>> > > > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > >> previous email. In general, maybe we
>> should
>> > > > > discuss
>> > > > > > > the
>> > > > > > > > > > final
>> > > > > > > > > > > > > > solution
>> > > > > > > > > > > > > > > > >> that
>> > > > > > > > > > > > > > > > >> covers all cases?
>> > > > > > > > > > > > > > > > >>
>> > > > > > > > > > > > > > > > >>
>> > > > > > > > > > > > > > > > >> 5) The second question in my previous
>> email
>> > is
>> > > > > > related
>> > > > > > > > to
>> > > > > > > > > > the
>> > > > > > > > > > > > > > > following
>> > > > > > > > > > > > > > > > >> paragraph:
>> > > > > > > > > > > > > > > > >>
>> > > > > > > > > > > > > > > > >> "... In some cases, offsets returned from
>> > > > > position()
>> > > > > > > > could
>> > > > > > > > > > be
>> > > > > > > > > > > > > actual
>> > > > > > > > > > > > > > > > >> consumed messages by this consumer
>> > identified
>> > > by
>> > > > > > > > {offset,
>> > > > > > > > > > > leader
>> > > > > > > > > > > > > > > epoch}.
>> > > > > > > > > > > > > > > > >> In
>> > > > > > > > > > > > > > > > >> other cases, position() returns offset
>> that
>> > > was
>> > > > > not
>> > > > > > > > > actually
>> > > > > > > > > > > > > > consumed.
>> > > > > > > > > > > > > > > > >> Suppose, the user calls position() for
>> the
>> > > last
>> > > > > > > > > offset...".
>> > > > > > > > > > > > > > > > >>
>> > > > > > > > > > > > > > > > >> I guess my point is that, if user calls
>> > > > position()
>> > > > > > for
>> > > > > > > > the
>> > > > > > > > > > > last
>> > > > > > > > > > > > > > offset
>> > > > > > > > > > > > > > > > and
>> > > > > > > > > > > > > > > > >> uses that offset in seek(...), then user
>> can
>> > > > > > probably
>> > > > > > > > just
>> > > > > > > > > > > call
>> > > > > > > > > > > > > > > > >> Consumer#seekToEnd() without calling
>> > > position()
>> > > > > and
>> > > > > > > > > > seek(...).
>> > > > > > > > > > > > > > > Similarly
>> > > > > > > > > > > > > > > > >> user can call Consumer#seekToBeginning()
>> to
>> > > the
>> > > > > seek
>> > > > > > > to
>> > > > > > > > > the
>> > > > > > > > > > > > > earliest
>> > > > > > > > > > > > > > > > >> position without calling position() and
>> > > > seek(...).
>> > > > > > > Thus
>> > > > > > > > > > > > position()
>> > > > > > > > > > > > > > > only
>> > > > > > > > > > > > > > > > >> needs to return the actual consumed
>> messages
>> > > > > > > identified
>> > > > > > > > by
>> > > > > > > > > > > > > {offset,
>> > > > > > > > > > > > > > > > leader
>> > > > > > > > > > > > > > > > >> epoch}. Does this make sense?
>> > > > > > > > > > > > > > > > >>
>> > > > > > > > > > > > > > > > >>
>> > > > > > > > > > > > > > > > >> Thanks,
>> > > > > > > > > > > > > > > > >> Dong
>> > > > > > > > > > > > > > > > >>
>> > > > > > > > > > > > > > > > >>
>> > > > > > > > > > > > > > > > >> On Mon, Jul 9, 2018 at 6:47 PM, Anna
>> > Povzner <
>> > > > > > > > > > > a...@confluent.io
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > > > wrote:
>> > > > > > > > > > > > > > > > >>
>> > > > > > > > > > > > > > > > >> > Hi Dong,
>> > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > >> > Thanks for considering my suggestions.
>> > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > >> > Based on your comments, I realized
>> that my
>> > > > > > > suggestion
>> > > > > > > > > was
>> > > > > > > > > > > not
>> > > > > > > > > > > > > > > complete
>> > > > > > > > > > > > > > > > >> with
>> > > > > > > > > > > > > > > > >> > regard to KafkaConsumer API vs.
>> > > > consumer-broker
>> > > > > > > > > protocol.
>> > > > > > > > > > > > While
>> > > > > > > > > > > > > I
>> > > > > > > > > > > > > > > > >> propose
>> > > > > > > > > > > > > > > > >> > to keep KafkaConsumer#seek() unchanged
>> and
>> > > > take
>> > > > > > > offset
>> > > > > > > > > > only,
>> > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > >> underlying
>> > > > > > > > > > > > > > > > >> > consumer will send the next
>> FetchRequest()
>> > > to
>> > > > > > broker
>> > > > > > > > > with
>> > > > > > > > > > > > offset
>> > > > > > > > > > > > > > and
>> > > > > > > > > > > > > > > > >> > leaderEpoch if it is known (based on
>> > leader
>> > > > > epoch
>> > > > > > > > cache
>> > > > > > > > > in
>> > > > > > > > > > > > > > > consumer) —
>> > > > > > > > > > > > > > > > >> note
>> > > > > > > > > > > > > > > > >> > that this is different from the current
>> > KIP,
>> > > > > which
>> > > > > > > > > > suggests
>> > > > > > > > > > > to
>> > > > > > > > > > > > > > > always
>> > > > > > > > > > > > > > > > >> send
>> > > > > > > > > > > > > > > > >> > unknown leader epoch after seek(). This
>> > way,
>> > > > if
>> > > > > > the
>> > > > > > > > > > consumer
>> > > > > > > > > > > > > and a
>> > > > > > > > > > > > > > > > >> broker
>> > > > > > > > > > > > > > > > >> > agreed on the point of non-divergence,
>> > which
>> > > > is
>> > > > > > some
>> > > > > > > > > > > {offset,
>> > > > > > > > > > > > > > > > >> leaderEpoch}
>> > > > > > > > > > > > > > > > >> > pair, the new leader which causes
>> another
>> > > > > > truncation
>> > > > > > > > > (even
>> > > > > > > > > > > > > further
>> > > > > > > > > > > > > > > > back)
>> > > > > > > > > > > > > > > > >> > will be able to detect new divergence
>> and
>> > > > > restart
>> > > > > > > the
>> > > > > > > > > > > process
>> > > > > > > > > > > > of
>> > > > > > > > > > > > > > > > finding
>> > > > > > > > > > > > > > > > >> > the new point of non-divergence. So, to
>> > > answer
>> > > > > > your
>> > > > > > > > > > > question,
>> > > > > > > > > > > > If
>> > > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > >> > truncation happens just after the user
>> > calls
>> > > > > > > > > > > > > > > > >> > KafkaConsumer#findOffsets(offset,
>> > > > leaderEpoch)
>> > > > > > > > followed
>> > > > > > > > > > by
>> > > > > > > > > > > > > > > > seek(offset),
>> > > > > > > > > > > > > > > > >> > the user will not seek to the wrong
>> > position
>> > > > > > without
>> > > > > > > > > > knowing
>> > > > > > > > > > > > > that
>> > > > > > > > > > > > > > > > >> > truncation has happened, because the
>> > > consumer
>> > > > > will
>> > > > > > > get
>> > > > > > > > > > > another
>> > > > > > > > > > > > > > > > >> truncation
>> > > > > > > > > > > > > > > > >> > error, and seek again.
>> > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > >> > I am afraid, I did not understand your
>> > > second
>> > > > > > > > question.
>> > > > > > > > > > Let
>> > > > > > > > > > > me
>> > > > > > > > > > > > > > > > >> summarize my
>> > > > > > > > > > > > > > > > >> > suggestions again, and then give an
>> > example
>> > > to
>> > > > > > > > hopefully
>> > > > > > > > > > > make
>> > > > > > > > > > > > my
>> > > > > > > > > > > > > > > > >> > suggestions more clear. Also, the last
>> > part
>> > > of
>> > > > > my
>> > > > > > > > > example
>> > > > > > > > > > > > shows
>> > > > > > > > > > > > > > how
>> > > > > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > >> > use-case in your first question will
>> work.
>> > > If
>> > > > it
>> > > > > > > does
>> > > > > > > > > not
>> > > > > > > > > > > > answer
>> > > > > > > > > > > > > > > your
>> > > > > > > > > > > > > > > > >> > second question, would you mind
>> > clarifying?
>> > > I
>> > > > am
>> > > > > > > also
>> > > > > > > > > > > focusing
>> > > > > > > > > > > > > on
>> > > > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > >> case
>> > > > > > > > > > > > > > > > >> > of a consumer having enough entries in
>> the
>> > > > > cache.
>> > > > > > > The
>> > > > > > > > > case
>> > > > > > > > > > > of
>> > > > > > > > > > > > > > > > restarting
>> > > > > > > > > > > > > > > > >> > from committed offset either stored
>> > > externally
>> > > > > or
>> > > > > > > > > > internally
>> > > > > > > > > > > > > will
>> > > > > > > > > > > > > > > > >> probably
>> > > > > > > > > > > > > > > > >> > need to be discussed more.
>> > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > >> > Let me summarize my suggestion again:
>> > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > >> > 1) KafkaConsumer#seek() and
>> > > > > > KafkaConsumer#position()
>> > > > > > > > > > remains
>> > > > > > > > > > > > > > > unchanged
>> > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > >> > 2) New KafkaConsumer#findOffsets()
>> takes
>> > > > > {offset,
>> > > > > > > > > > > leaderEpoch}
>> > > > > > > > > > > > > > pair
>> > > > > > > > > > > > > > > > per
>> > > > > > > > > > > > > > > > >> > topic partition and returns offset per
>> > topic
>> > > > > > > > partition.
>> > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > >> > 3) FetchRequest() to broker after
>> > > > > > > KafkaConsumer#seek()
>> > > > > > > > > > will
>> > > > > > > > > > > > > > contain
>> > > > > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > >> > offset set by seek and leaderEpoch that
>> > > > > > corresponds
>> > > > > > > to
>> > > > > > > > > the
>> > > > > > > > > > > > > offset
>> > > > > > > > > > > > > > > > based
>> > > > > > > > > > > > > > > > >> on
>> > > > > > > > > > > > > > > > >> > leader epoch cache in the consumer.
>> > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > >> > The rest of this e-mail is a long and
>> > > > contrived
>> > > > > > > > example
>> > > > > > > > > > with
>> > > > > > > > > > > > > > several
>> > > > > > > > > > > > > > > > log
>> > > > > > > > > > > > > > > > >> > truncations and unclean leader
>> elections
>> > to
>> > > > > > > illustrate
>> > > > > > > > > the
>> > > > > > > > > > > API
>> > > > > > > > > > > > > and
>> > > > > > > > > > > > > > > > your
>> > > > > > > > > > > > > > > > >> > first use-case. Suppose we have three
>> > > brokers.
>> > > > > > > > > Initially,
>> > > > > > > > > > > > Broker
>> > > > > > > > > > > > > > A,
>> > > > > > > > > > > > > > > B,
>> > > > > > > > > > > > > > > > >> and
>> > > > > > > > > > > > > > > > >> > C has one message at offset 0 with
>> leader
>> > > > epoch
>> > > > > 0.
>> > > > > > > > Then,
>> > > > > > > > > > > > Broker
>> > > > > > > > > > > > > A
>> > > > > > > > > > > > > > > goes
>> > > > > > > > > > > > > > > > >> down
>> > > > > > > > > > > > > > > > >> > for some time. Broker B becomes a
>> leader
>> > > with
>> > > > > > epoch
>> > > > > > > 1,
>> > > > > > > > > and
>> > > > > > > > > > > > > writes
>> > > > > > > > > > > > > > > > >> messages
>> > > > > > > > > > > > > > > > >> > to offsets 1 and 2. Broker C fetches
>> > offset
>> > > 1,
>> > > > > but
>> > > > > > > > > before
>> > > > > > > > > > > > > fetching
>> > > > > > > > > > > > > > > > >> offset
>> > > > > > > > > > > > > > > > >> > 2, becomes a leader with leader epoch 2
>> > and
>> > > > > > writes a
>> > > > > > > > > > message
>> > > > > > > > > > > > at
>> > > > > > > > > > > > > > > offset
>> > > > > > > > > > > > > > > > >> 2.
>> > > > > > > > > > > > > > > > >> > Here is the state of brokers at this
>> > point:
>> > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > >> > > Broker A:
>> > > > > > > > > > > > > > > > >> > > offset 0, epoch 0 <— leader
>> > > > > > > > > > > > > > > > >> > > goes down…
>> > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > >> > > Broker B:
>> > > > > > > > > > > > > > > > >> > > offset 0, epoch 0
>> > > > > > > > > > > > > > > > >> > > offset 1, epoch 1  <- leader
>> > > > > > > > > > > > > > > > >> > > offset 2, epoch 1
>> > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > >> > Broker C:
>> > > > > > > > > > > > > > > > >> > > offset 0, epoch 0
>> > > > > > > > > > > > > > > > >> > > offset 1, epoch 1
>> > > > > > > > > > > > > > > > >> > > offset 2, epoch 2 <— leader
>> > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > >> > Before Broker C becomes a leader with
>> > leader
>> > > > > epoch
>> > > > > > > 2,
>> > > > > > > > > the
>> > > > > > > > > > > > > consumer
>> > > > > > > > > > > > > > > > >> consumed
>> > > > > > > > > > > > > > > > >> > the following messages from broker A
>> and
>> > > > broker
>> > > > > B:
>> > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > >> > {offset=0, leaderEpoch=0}, {offset=1,
>> > > > > > > leaderEpoch=1},
>> > > > > > > > > > > > {offset=2,
>> > > > > > > > > > > > > > > > >> > leaderEpoch=1}.
>> > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > >> > Consumer’s leader epoch cache at this
>> > point
>> > > > > > contains
>> > > > > > > > the
>> > > > > > > > > > > > > following
>> > > > > > > > > > > > > > > > >> entries:
>> > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > >> > (leaderEpoch=0, startOffset=0)
>> > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > >> > (leaderEpoch=1, startOffset=1)
>> > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > >> > endOffset = 3
>> > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > >> > Then, broker B becomes the follower of
>> > > broker
>> > > > C,
>> > > > > > > > > truncates
>> > > > > > > > > > > and
>> > > > > > > > > > > > > > > starts
>> > > > > > > > > > > > > > > > >> > fetching from offset 2.
>> > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > >> > Consumer sends fetchRequest(offset=3,
>> > > > > > leaderEpoch=1)
>> > > > > > > > and
>> > > > > > > > > > > gets
>> > > > > > > > > > > > > > > > >> > LOG_TRUNCATION
>> > > > > > > > > > > > > > > > >> > error from broker C.
>> > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > >> > In response, the client calls
>> > > > > > > > KafkaConsumer#findOffsets(
>> > > > > > > > > > > > > offset=3,
>> > > > > > > > > > > > > > > > >> > leaderEpoch=1). The underlying consumer
>> > > sends
>> > > > > > > > > > > > > > > > >> > OffsetsForLeaderEpoch(leaderEpoch=1),
>> > > broker
>> > > > C
>> > > > > > > > responds
>> > > > > > > > > > with
>> > > > > > > > > > > > > > > > >> > {leaderEpoch=1, endOffset=2}. So,
>> > > > > > > > > > > > > > > KafkaConsumer#findOffsets(offset=3,
>> > > > > > > > > > > > > > > > >> > leaderEpoch=1) returns offset=2.
>> > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > >> > In response, consumer calls
>> > > KafkaConsumer@seek
>> > > > > > > > > (offset=2)
>> > > > > > > > > > > > > followed
>> > > > > > > > > > > > > > > by
>> > > > > > > > > > > > > > > > >> > poll(), which results in
>> > > > FetchRequest(offset=2,
>> > > > > > > > > > > leaderEpoch=1)
>> > > > > > > > > > > > > to
>> > > > > > > > > > > > > > > > >> broker C.
>> > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > >> > I will continue with this example with
>> the
>> > > > goal
>> > > > > to
>> > > > > > > > > answer
>> > > > > > > > > > > your
>> > > > > > > > > > > > > > first
>> > > > > > > > > > > > > > > > >> > question about truncation just after
>> > > > > findOffsets()
>> > > > > > > > > > followed
>> > > > > > > > > > > by
>> > > > > > > > > > > > > > > seek():
>> > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > >> > Suppose, brokers B and C go down, and
>> > > broker A
>> > > > > > comes
>> > > > > > > > up
>> > > > > > > > > > and
>> > > > > > > > > > > > > > becomes
>> > > > > > > > > > > > > > > a
>> > > > > > > > > > > > > > > > >> > leader with leader epoch 3, and writes
>> a
>> > > > message
>> > > > > > to
>> > > > > > > > > offset
>> > > > > > > > > > > 1.
>> > > > > > > > > > > > > > > Suppose,
>> > > > > > > > > > > > > > > > >> this
>> > > > > > > > > > > > > > > > >> > happens before the consumer gets
>> response
>> > > from
>> > > > > > > broker
>> > > > > > > > C
>> > > > > > > > > to
>> > > > > > > > > > > the
>> > > > > > > > > > > > > > > > previous
>> > > > > > > > > > > > > > > > >> > fetch request:  FetchRequest(offset=2,
>> > > > > > > leaderEpoch=1).
>> > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > >> > Consumer re-sends
>> FetchRequest(offset=2,
>> > > > > > > > leaderEpoch=1)
>> > > > > > > > > to
>> > > > > > > > > > > > > broker
>> > > > > > > > > > > > > > A,
>> > > > > > > > > > > > > > > > >> which
>> > > > > > > > > > > > > > > > >> > returns LOG_TRUNCATION error, because
>> > > broker A
>> > > > > has
>> > > > > > > > > leader
>> > > > > > > > > > > > epoch
>> > > > > > > > > > > > > 3
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > >> leader
>> > > > > > > > > > > > > > > > >> > epoch in FetchRequest with starting
>> > offset =
>> > > > 1 <
>> > > > > > > > offset
>> > > > > > > > > 2
>> > > > > > > > > > in
>> > > > > > > > > > > > > > > > >> > FetchRequest().
>> > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > >> > In response, the user calls
>> > > > > > > KafkaConsumer#findOffsets(
>> > > > > > > > > > > > offset=2,
>> > > > > > > > > > > > > > > > >> > leaderEpoch=1). The underlying consumer
>> > > sends
>> > > > > > > > > > > > > > > > >> > OffsetsForLeaderEpoch(leaderEpoch=1),
>> > > broker
>> > > > A
>> > > > > > > > responds
>> > > > > > > > > > with
>> > > > > > > > > > > > > > > > >> > {leaderEpoch=0, endOffset=1}; the
>> > underlying
>> > > > > > > consumer
>> > > > > > > > > > finds
>> > > > > > > > > > > > > > > > leaderEpoch
>> > > > > > > > > > > > > > > > >> = 0
>> > > > > > > > > > > > > > > > >> > in its cache with end offset == 1,
>> which
>> > > > results
>> > > > > > in
>> > > > > > > > > > > > > > > > >> > KafkaConsumer#findOffsets(offset=2,
>> > > > > > leaderEpoch=1)
>> > > > > > > > > > returning
>> > > > > > > > > > > > > > offset
>> > > > > > > > > > > > > > > > = 1.
>> > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > >> > In response, the user calls
>> > > KafkaConsumer@seek
>> > > > > > > > > (offset=1)
>> > > > > > > > > > > > > followed
>> > > > > > > > > > > > > > > by
>> > > > > > > > > > > > > > > > >> > poll(), which results in
>> > > > FetchRequest(offset=1,
>> > > > > > > > > > > leaderEpoch=0)
>> > > > > > > > > > > > > to
>> > > > > > > > > > > > > > > > >> broker A,
>> > > > > > > > > > > > > > > > >> > which responds with message at offset
>> 1,
>> > > > leader
>> > > > > > > epoch
>> > > > > > > > 3.
>> > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > >> > I will think some more about consumers
>> > > > > restarting
>> > > > > > > from
>> > > > > > > > > > > > committed
>> > > > > > > > > > > > > > > > >> offsets,
>> > > > > > > > > > > > > > > > >> > and send a follow up.
>> > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > >> > Thanks,
>> > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > >> > Anna
>> > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > >> > On Sat, Jul 7, 2018 at 1:36 AM Dong
>> Lin <
>> > > > > > > > > > > lindon...@gmail.com>
>> > > > > > > > > > > > > > > wrote:
>> > > > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > > > >> > > Hey Anna,
>> > > > > > > > > > > > > > > > >> > >
>> > > > > > > > > > > > > > > > >> > > Thanks much for the thoughtful
>> reply. It
>> > > > makes
>> > > > > > > sense
>> > > > > > > > > to
>> > > > > > > > > > > > > > different
>> > > > > > > > > > > > > > > > >> between
>> > > > > > > > > > > > > > > > >> > > "seeking to a message" and "seeking
>> to a
>> > > > > > > position".
>> > > > > > > > I
>> > > > > > > > > > have
>> > > > > > > > > > > > to
>> > > > > > > > > > > > > > > > >> questions
>> > > > > > > > > > > > > > > > >> > > here:
>> > > > > > > > > > > > > > > > >> > >
>> > > > > > > > > > > > > > > > >> > > - For "seeking to a message"
>> use-case,
>> > > with
>> > > > > the
>> > > > > > > > > proposed
>> > > > > > > > > > > > > > approach
>> > > > > > > > > > > > > > > > user
>> > > > > > > > > > > > > > > > >> > > needs to call findOffset(offset,
>> > > > leaderEpoch)
>> > > > > > > > followed
>> > > > > > > > > > by
>> > > > > > > > > > > > > > > > >> seek(offset).
>> > > > > > > > > > > > > > > > >> > If
>> > > > > > > > > > > > > > > > >> > > message truncation and message append
>> > > happen
>> > > > > > > > > immediately
>> > > > > > > > > > > > after
>> > > > > > > > > > > > > > > > >> > > findOffset(offset,
>> > > > > > > > > > > > > > > > >> > > leaderEpoch) but before
>> seek(offset), it
>> > > > seems
>> > > > > > > that
>> > > > > > > > > user
>> > > > > > > > > > > > will
>> > > > > > > > > > > > > > seek
>> > > > > > > > > > > > > > > > to
>> > > > > > > > > > > > > > > > >> the
>> > > > > > > > > > > > > > > > >> > > wrong message without knowing the
>> > > truncation
>> > > > > has
>> > > > > > > > > > happened.
>> > > > > > > > > > > > > Would
>> > > > > > > > > > > > > > > > this
>> > > > > > > > > > > > > > > > >> be
>> > > > > > > > > > > > > > > > >> > a
>> > > > > > > > > > > > > > > > >> > > problem?
>> > > > > > > > > > > > > > > > >> > >
>> > > > > > > > > > > > > > > > >> > > - For "seeking to a position"
>> use-case,
>> > it
>> > > > > seems
>> > > > > > > > that
>> > > > > > > > > > > there
>> > > > > > > > > > > > > can
>> > > > > > > > > > > > > > be
>> > > > > > > > > > > > > > > > two
>> > > > > > > > > > > > > > > > >> > > positions, i.e. earliest and latest.
>> So
>> > > > these
>> > > > > > two
>> > > > > > > > > cases
>> > > > > > > > > > > can
>> > > > > > > > > > > > be
>> > > > > > > > > > > > > > > > >> > > Consumer.fulfilled by
>> seekToBeginning()
>> > > and
>> > > > > > > > > > > > > > Consumer.seekToEnd().
>> > > > > > > > > > > > > > > > >> Then it
>> > > > > > > > > > > > > > > > >> > > seems that user will only need to
>> call
>> > > > > > position()
>> > > > > > > > and
>> > > > > > > > > > > seek()
>> > > > > > > > > > > > > for
>> > > > > > > > > > > > > > > > >> "seeking
>> > > > > > > > > > > > > > > > >> > > to a message" use-case?
>> > > > > > > > > > > > > > > > >> > >
>> > > > > > > > > > > > > > > > >> > > Thanks,
>> > > > > > > > > > > > > > > > >> > > Dong
>> > > > > > > > > > > > > > > > >> > >
>> > > > > > > > > > > > > > > > >> > >
>> > > > > > > > > > > > > > > > >> > > On Wed, Jul 4, 2018 at 12:33 PM, Anna
>> > > > Povzner
>> > > > > <
>> > > > > > > > > > > > > > a...@confluent.io>
>> > > > > > > > > > > > > > > > >> wrote:
>> > > > > > > > > > > > > > > > >> > >
>> > > > > > > > > > > > > > > > >> > > > Hi Jason and Dong,
>> > > > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > > > >> > > > I’ve been thinking about your
>> > > suggestions
>> > > > > and
>> > > > > > > > > > discussion
>> > > > > > > > > > > > > > > regarding
>> > > > > > > > > > > > > > > > >> > > > position(), seek(), and new
>> proposed
>> > > API.
>> > > > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > > > >> > > > Here is my thought process why we
>> > should
>> > > > > keep
>> > > > > > > > > > position()
>> > > > > > > > > > > > and
>> > > > > > > > > > > > > > > > seek()
>> > > > > > > > > > > > > > > > >> API
>> > > > > > > > > > > > > > > > >> > > > unchanged.
>> > > > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > > > >> > > > I think we should separate {offset,
>> > > leader
>> > > > > > > epoch}
>> > > > > > > > > that
>> > > > > > > > > > > > > > uniquely
>> > > > > > > > > > > > > > > > >> > > identifies
>> > > > > > > > > > > > > > > > >> > > > a message from an offset that is a
>> > > > position.
>> > > > > > In
>> > > > > > > > some
>> > > > > > > > > > > > cases,
>> > > > > > > > > > > > > > > > offsets
>> > > > > > > > > > > > > > > > >> > > > returned from position() could be
>> > actual
>> > > > > > > consumed
>> > > > > > > > > > > messages
>> > > > > > > > > > > > > by
>> > > > > > > > > > > > > > > this
>> > > > > > > > > > > > > > > > >> > > consumer
>> > > > > > > > > > > > > > > > >> > > > identified by {offset, leader
>> epoch}.
>> > In
>> > > > > other
>> > > > > > > > > cases,
>> > > > > > > > > > > > > > position()
>> > > > > > > > > > > > > > > > >> > returns
>> > > > > > > > > > > > > > > > >> > > > offset that was not actually
>> consumed.
>> > > > > > Suppose,
>> > > > > > > > the
>> > > > > > > > > > user
>> > > > > > > > > > > > > calls
>> > > > > > > > > > > > > > > > >> > position()
>> > > > > > > > > > > > > > > > >> > > > for the last offset. Suppose we
>> return
>> > > > > > {offset,
>> > > > > > > > > leader
>> > > > > > > > > > > > > epoch}
>> > > > > > > > > > > > > > of
>> > > > > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > >> > > > message currently in the log. Then,
>> > the
>> > > > > > message
>> > > > > > > > gets
>> > > > > > > > > > > > > truncated
>> > > > > > > > > > > > > > > > >> before
>> > > > > > > > > > > > > > > > >> > > > consumer’s first poll(). It does
>> not
>> > > make
>> > > > > > sense
>> > > > > > > > for
>> > > > > > > > > > > poll()
>> > > > > > > > > > > > > to
>> > > > > > > > > > > > > > > fail
>> > > > > > > > > > > > > > > > >> in
>> > > > > > > > > > > > > > > > >> > > this
>> > > > > > > > > > > > > > > > >> > > > case, because the log truncation
>> did
>> > not
>> > > > > > > actually
>> > > > > > > > > > happen
>> > > > > > > > > > > > > from
>> > > > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > >> > > consumer
>> > > > > > > > > > > > > > > > >> > > > perspective. On the other hand, as
>> the
>> > > KIP
>> > > > > > > > proposes,
>> > > > > > > > > > it
>> > > > > > > > > > > > > makes
>> > > > > > > > > > > > > > > > sense
>> > > > > > > > > > > > > > > > >> for
>> > > > > > > > > > > > > > > > >> > > the
>> > > > > > > > > > > > > > > > >> > > > committed() method to return
>> {offset,
>> > > > leader
>> > > > > > > > epoch}
>> > > > > > > > > > > > because
>> > > > > > > > > > > > > > > those
>> > > > > > > > > > > > > > > > >> > offsets
>> > > > > > > > > > > > > > > > >> > > > represent actual consumed messages.
>> > > > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > > > >> > > > The same argument applies to the
>> > seek()
>> > > > > > method —
>> > > > > > > > we
>> > > > > > > > > > are
>> > > > > > > > > > > > not
>> > > > > > > > > > > > > > > > seeking
>> > > > > > > > > > > > > > > > >> to
>> > > > > > > > > > > > > > > > >> > a
>> > > > > > > > > > > > > > > > >> > > > message, we are seeking to a
>> position.
>> > > > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > > > >> > > > I like the proposal to add
>> > > > > > > > > KafkaConsumer#findOffsets()
>> > > > > > > > > > > > API.
>> > > > > > > > > > > > > I
>> > > > > > > > > > > > > > am
>> > > > > > > > > > > > > > > > >> > assuming
>> > > > > > > > > > > > > > > > >> > > > something like:
>> > > > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > > > >> > > > Map<TopicPartition, Long>
>> > > > > > > > > > > findOffsets(Map<TopicPartition,
>> > > > > > > > > > > > > > > > >> > OffsetAndEpoch>
>> > > > > > > > > > > > > > > > >> > > > offsetsToSearch)
>> > > > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > > > >> > > > Similar to seek() and position(), I
>> > > think
>> > > > > > > > > > findOffsets()
>> > > > > > > > > > > > > should
>> > > > > > > > > > > > > > > > >> return
>> > > > > > > > > > > > > > > > >> > > > offset without leader epoch,
>> because
>> > > what
>> > > > we
>> > > > > > > want
>> > > > > > > > is
>> > > > > > > > > > the
>> > > > > > > > > > > > > > offset
>> > > > > > > > > > > > > > > > >> that we
>> > > > > > > > > > > > > > > > >> > > > think is closest to the not
>> divergent
>> > > > > message
>> > > > > > > from
>> > > > > > > > > the
>> > > > > > > > > > > > given
>> > > > > > > > > > > > > > > > >> consumed
>> > > > > > > > > > > > > > > > >> > > > message. Until the consumer
>> actually
>> > > > fetches
>> > > > > > the
>> > > > > > > > > > > message,
>> > > > > > > > > > > > we
>> > > > > > > > > > > > > > > > should
>> > > > > > > > > > > > > > > > >> not
>> > > > > > > > > > > > > > > > >> > > let
>> > > > > > > > > > > > > > > > >> > > > the consumer store the leader epoch
>> > for
>> > > a
>> > > > > > > message
>> > > > > > > > it
>> > > > > > > > > > did
>> > > > > > > > > > > > not
>> > > > > > > > > > > > > > > > >> consume.
>> > > > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > > > >> > > > So, the workflow will be:
>> > > > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > > > >> > > > 1) The user gets
>> > LogTruncationException
>> > > > with
>> > > > > > > > > {offset,
>> > > > > > > > > > > > leader
>> > > > > > > > > > > > > > > epoch
>> > > > > > > > > > > > > > > > >> of
>> > > > > > > > > > > > > > > > >> > the
>> > > > > > > > > > > > > > > > >> > > > previous message} (whatever we send
>> > with
>> > > > new
>> > > > > > > > > > > FetchRecords
>> > > > > > > > > > > > > > > > request).
>> > > > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > > > >> > > > 2) offset = findOffsets(tp ->
>> {offset,
>> > > > > leader
>> > > > > > > > > epoch})
>> > > > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > > > >> > > > 3) seek(offset)
>> > > > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > > > >> > > > For the use-case where the users
>> store
>> > > > > > committed
>> > > > > > > > > > offsets
>> > > > > > > > > > > > > > > > externally:
>> > > > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > > > >> > > > 1) Such users would have to track
>> the
>> > > > leader
>> > > > > > > epoch
>> > > > > > > > > > > > together
>> > > > > > > > > > > > > > with
>> > > > > > > > > > > > > > > > an
>> > > > > > > > > > > > > > > > >> > > offset.
>> > > > > > > > > > > > > > > > >> > > > Otherwise, there is no way to
>> detect
>> > > later
>> > > > > > what
>> > > > > > > > > leader
>> > > > > > > > > > > > epoch
>> > > > > > > > > > > > > > was
>> > > > > > > > > > > > > > > > >> > > associated
>> > > > > > > > > > > > > > > > >> > > > with the message. I think it’s
>> > > reasonable
>> > > > to
>> > > > > > ask
>> > > > > > > > > that
>> > > > > > > > > > > from
>> > > > > > > > > > > > > > users
>> > > > > > > > > > > > > > > > if
>> > > > > > > > > > > > > > > > >> > they
>> > > > > > > > > > > > > > > > >> > > > want to detect log truncation.
>> > > Otherwise,
>> > > > > they
>> > > > > > > > will
>> > > > > > > > > > get
>> > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > current
>> > > > > > > > > > > > > > > > >> > > > behavior.
>> > > > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > > > >> > > > If the users currently get an
>> offset
>> > to
>> > > be
>> > > > > > > stored
>> > > > > > > > > > using
>> > > > > > > > > > > > > > > > position(),
>> > > > > > > > > > > > > > > > >> I
>> > > > > > > > > > > > > > > > >> > see
>> > > > > > > > > > > > > > > > >> > > > two possibilities. First, they call
>> > save
>> > > > > > offset
>> > > > > > > > > > returned
>> > > > > > > > > > > > > from
>> > > > > > > > > > > > > > > > >> > position()
>> > > > > > > > > > > > > > > > >> > > > that they call before poll(). In
>> that
>> > > > case,
>> > > > > it
>> > > > > > > > would
>> > > > > > > > > > not
>> > > > > > > > > > > > be
>> > > > > > > > > > > > > > > > correct
>> > > > > > > > > > > > > > > > >> to
>> > > > > > > > > > > > > > > > >> > > > store {offset, leader epoch} if we
>> > would
>> > > > > have
>> > > > > > > > > changed
>> > > > > > > > > > > > > > position()
>> > > > > > > > > > > > > > > > to
>> > > > > > > > > > > > > > > > >> > > return
>> > > > > > > > > > > > > > > > >> > > > {offset, leader epoch} since actual
>> > > > fetched
>> > > > > > > > message
>> > > > > > > > > > > could
>> > > > > > > > > > > > be
>> > > > > > > > > > > > > > > > >> different
>> > > > > > > > > > > > > > > > >> > > > (from the example I described
>> > earlier).
>> > > > So,
>> > > > > it
>> > > > > > > > would
>> > > > > > > > > > be
>> > > > > > > > > > > > more
>> > > > > > > > > > > > > > > > >> correct to
>> > > > > > > > > > > > > > > > >> > > > call position() after poll().
>> However,
>> > > the
>> > > > > > user
>> > > > > > > > > > already
>> > > > > > > > > > > > gets
>> > > > > > > > > > > > > > > > >> > > > ConsumerRecords at this point, from
>> > > which
>> > > > > the
>> > > > > > > user
>> > > > > > > > > can
>> > > > > > > > > > > > > extract
>> > > > > > > > > > > > > > > > >> {offset,
>> > > > > > > > > > > > > > > > >> > > > leader epoch} of the last message.
>> > > > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > > > >> > > > So, I like the idea of adding a
>> helper
>> > > > > method
>> > > > > > to
>> > > > > > > > > > > > > > > ConsumerRecords,
>> > > > > > > > > > > > > > > > as
>> > > > > > > > > > > > > > > > >> > > Jason
>> > > > > > > > > > > > > > > > >> > > > proposed, something like:
>> > > > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > > > >> > > > public OffsetAndEpoch
>> > > > > > > lastOffsetWithLeaderEpoch(),
>> > > > > > > > > > where
>> > > > > > > > > > > > > > > > >> OffsetAndEpoch
>> > > > > > > > > > > > > > > > >> > > is
>> > > > > > > > > > > > > > > > >> > > > a data struct holding {offset,
>> leader
>> > > > > epoch}.
>> > > > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > > > >> > > > In this case, we would advise the
>> user
>> > > to
>> > > > > > follow
>> > > > > > > > the
>> > > > > > > > > > > > > workflow:
>> > > > > > > > > > > > > > > > >> poll(),
>> > > > > > > > > > > > > > > > >> > > get
>> > > > > > > > > > > > > > > > >> > > > {offset, leader epoch} from
>> > > > > > > > > > > ConsumerRecords#lastOffsetWith
>> > > > > > > > > > > > > > > > >> > LeaderEpoch(),
>> > > > > > > > > > > > > > > > >> > > > save offset and leader epoch,
>> process
>> > > > > records.
>> > > > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > > > >> > > > 2) When the user needs to seek to
>> the
>> > > last
>> > > > > > > > committed
>> > > > > > > > > > > > offset,
>> > > > > > > > > > > > > > > they
>> > > > > > > > > > > > > > > > >> call
>> > > > > > > > > > > > > > > > >> > > new
>> > > > > > > > > > > > > > > > >> > > > findOffsets(saved offset, leader
>> > epoch),
>> > > > and
>> > > > > > > then
>> > > > > > > > > > > > > > seek(offset).
>> > > > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > > > >> > > > What do you think?
>> > > > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > > > >> > > > Thanks,
>> > > > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > > > >> > > > Anna
>> > > > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > > > >> > > > On Tue, Jul 3, 2018 at 4:06 PM Dong
>> > Lin
>> > > <
>> > > > > > > > > > > > > lindon...@gmail.com>
>> > > > > > > > > > > > > > > > >> wrote:
>> > > > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > > > >> > > > > Hey Jason,
>> > > > > > > > > > > > > > > > >> > > > >
>> > > > > > > > > > > > > > > > >> > > > > Thanks much for your thoughtful
>> > > > > explanation.
>> > > > > > > > > > > > > > > > >> > > > >
>> > > > > > > > > > > > > > > > >> > > > > Yes the solution using
>> > > > findOffsets(offset,
>> > > > > > > > > > > leaderEpoch)
>> > > > > > > > > > > > > also
>> > > > > > > > > > > > > > > > >> works.
>> > > > > > > > > > > > > > > > >> > The
>> > > > > > > > > > > > > > > > >> > > > > advantage of this solution it
>> adds
>> > > only
>> > > > > one
>> > > > > > > API
>> > > > > > > > > > > instead
>> > > > > > > > > > > > of
>> > > > > > > > > > > > > > two
>> > > > > > > > > > > > > > > > >> APIs.
>> > > > > > > > > > > > > > > > >> > > The
>> > > > > > > > > > > > > > > > >> > > > > concern is that its usage seems a
>> > bit
>> > > > more
>> > > > > > > > clumsy
>> > > > > > > > > > for
>> > > > > > > > > > > > > > advanced
>> > > > > > > > > > > > > > > > >> users.
>> > > > > > > > > > > > > > > > >> > > > More
>> > > > > > > > > > > > > > > > >> > > > > specifically, advanced users who
>> > store
>> > > > > > offsets
>> > > > > > > > > > > > externally
>> > > > > > > > > > > > > > will
>> > > > > > > > > > > > > > > > >> always
>> > > > > > > > > > > > > > > > >> > > > need
>> > > > > > > > > > > > > > > > >> > > > > to call findOffsets() before
>> calling
>> > > > > > > > seek(offset)
>> > > > > > > > > > > during
>> > > > > > > > > > > > > > > > consumer
>> > > > > > > > > > > > > > > > >> > > > > initialization. And those
>> advanced
>> > > users
>> > > > > > will
>> > > > > > > > need
>> > > > > > > > > > to
>> > > > > > > > > > > > > > manually
>> > > > > > > > > > > > > > > > >> keep
>> > > > > > > > > > > > > > > > >> > > track
>> > > > > > > > > > > > > > > > >> > > > > of the leaderEpoch of the last
>> > > > > > ConsumerRecord.
>> > > > > > > > > > > > > > > > >> > > > >
>> > > > > > > > > > > > > > > > >> > > > > The other solution may be more
>> > > > > user-friendly
>> > > > > > > for
>> > > > > > > > > > > > advanced
>> > > > > > > > > > > > > > > users
>> > > > > > > > > > > > > > > > >> is to
>> > > > > > > > > > > > > > > > >> > > add
>> > > > > > > > > > > > > > > > >> > > > > two APIs, `void seek(offset,
>> > > > leaderEpoch)`
>> > > > > > and
>> > > > > > > > > > > `(offset,
>> > > > > > > > > > > > > > > epoch)
>> > > > > > > > > > > > > > > > =
>> > > > > > > > > > > > > > > > >> > > > > offsetEpochs(topicPartition)`.
>> > > > > > > > > > > > > > > > >> > > > >
>> > > > > > > > > > > > > > > > >> > > > > I kind of prefer the second
>> solution
>> > > > > because
>> > > > > > > it
>> > > > > > > > is
>> > > > > > > > > > > > easier
>> > > > > > > > > > > > > to
>> > > > > > > > > > > > > > > use
>> > > > > > > > > > > > > > > > >> for
>> > > > > > > > > > > > > > > > >> > > > > advanced users. If we need to
>> expose
>> > > > > > > leaderEpoch
>> > > > > > > > > > > anyway
>> > > > > > > > > > > > to
>> > > > > > > > > > > > > > > > safely
>> > > > > > > > > > > > > > > > >> > > > identify
>> > > > > > > > > > > > > > > > >> > > > > a message, it may be conceptually
>> > > > simpler
>> > > > > to
>> > > > > > > > > expose
>> > > > > > > > > > it
>> > > > > > > > > > > > > > > directly
>> > > > > > > > > > > > > > > > in
>> > > > > > > > > > > > > > > > >> > > > > seek(...) rather than requiring
>> one
>> > > more
>> > > > > > > > > translation
>> > > > > > > > > > > > using
>> > > > > > > > > > > > > > > > >> > > > > findOffsets(...). But I am also
>> OK
>> > > with
>> > > > > the
>> > > > > > > > first
>> > > > > > > > > > > > solution
>> > > > > > > > > > > > > > if
>> > > > > > > > > > > > > > > > >> other
>> > > > > > > > > > > > > > > > >> > > > > developers also favor that one :)
>> > > > > > > > > > > > > > > > >> > > > >
>> > > > > > > > > > > > > > > > >> > > > > Thanks,
>> > > > > > > > > > > > > > > > >> > > > > Dong
>> > > > > > > > > > > > > > > > >> > > > >
>> > > > > > > > > > > > > > > > >> > > > >
>> > > > > > > > > > > > > > > > >> > > > > On Mon, Jul 2, 2018 at 11:10 AM,
>> > Jason
>> > > > > > > > Gustafson <
>> > > > > > > > > > > > > > > > >> ja...@confluent.io
>> > > > > > > > > > > > > > > > >> > >
>> > > > > > > > > > > > > > > > >> > > > > wrote:
>> > > > > > > > > > > > > > > > >> > > > >
>> > > > > > > > > > > > > > > > >> > > > > > Hi Dong,
>> > > > > > > > > > > > > > > > >> > > > > >
>> > > > > > > > > > > > > > > > >> > > > > > Thanks, I've been thinking
>> about
>> > > your
>> > > > > > > > > suggestions
>> > > > > > > > > > a
>> > > > > > > > > > > > bit.
>> > > > > > > > > > > > > > It
>> > > > > > > > > > > > > > > is
>> > > > > > > > > > > > > > > > >> > > > > challenging
>> > > > > > > > > > > > > > > > >> > > > > > to make this work given the
>> > current
>> > > > > APIs.
>> > > > > > > One
>> > > > > > > > of
>> > > > > > > > > > the
>> > > > > > > > > > > > > > > > >> difficulties
>> > > > > > > > > > > > > > > > >> > is
>> > > > > > > > > > > > > > > > >> > > > that
>> > > > > > > > > > > > > > > > >> > > > > > we don't have an API to find
>> the
>> > > > leader
>> > > > > > > epoch
>> > > > > > > > > for
>> > > > > > > > > > a
>> > > > > > > > > > > > > given
>> > > > > > > > > > > > > > > > >> offset at
>> > > > > > > > > > > > > > > > >> > > the
>> > > > > > > > > > > > > > > > >> > > > > > moment. So if the user does a
>> seek
>> > > to
>> > > > > > offset
>> > > > > > > > 5,
>> > > > > > > > > > then
>> > > > > > > > > > > > > we'll
>> > > > > > > > > > > > > > > > need
>> > > > > > > > > > > > > > > > >> a
>> > > > > > > > > > > > > > > > >> > new
>> > > > > > > > > > > > > > > > >> > > > API
>> > > > > > > > > > > > > > > > >> > > > > > to find the corresponding
>> epoch in
>> > > > order
>> > > > > > to
>> > > > > > > > > > fulfill
>> > > > > > > > > > > > the
>> > > > > > > > > > > > > > new
>> > > > > > > > > > > > > > > > >> > > position()
>> > > > > > > > > > > > > > > > >> > > > > API.
>> > > > > > > > > > > > > > > > >> > > > > > Potentially we could modify
>> > > > ListOffsets
>> > > > > to
>> > > > > > > > > enable
>> > > > > > > > > > > > > finding
>> > > > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > >> > leader
>> > > > > > > > > > > > > > > > >> > > > > epoch,
>> > > > > > > > > > > > > > > > >> > > > > > but I am not sure it is
>> > worthwhile.
>> > > > > > Perhaps
>> > > > > > > it
>> > > > > > > > > is
>> > > > > > > > > > > > > > reasonable
>> > > > > > > > > > > > > > > > for
>> > > > > > > > > > > > > > > > >> > > > advanced
>> > > > > > > > > > > > > > > > >> > > > > > usage to expect that the epoch
>> > > > > > information,
>> > > > > > > if
>> > > > > > > > > > > needed,
>> > > > > > > > > > > > > > will
>> > > > > > > > > > > > > > > be
>> > > > > > > > > > > > > > > > >> > > > extracted
>> > > > > > > > > > > > > > > > >> > > > > > from the records directly? It
>> > might
>> > > > make
>> > > > > > > sense
>> > > > > > > > > to
>> > > > > > > > > > > > > expose a
>> > > > > > > > > > > > > > > > >> helper
>> > > > > > > > > > > > > > > > >> > in
>> > > > > > > > > > > > > > > > >> > > > > > `ConsumerRecords` to make this
>> a
>> > > > little
>> > > > > > > easier
>> > > > > > > > > > > though.
>> > > > > > > > > > > > > > > > >> > > > > >
>> > > > > > > > > > > > > > > > >> > > > > > Alternatively, if we think it
>> is
>> > > > > important
>> > > > > > > to
>> > > > > > > > > have
>> > > > > > > > > > > > this
>> > > > > > > > > > > > > > > > >> information
>> > > > > > > > > > > > > > > > >> > > > > exposed
>> > > > > > > > > > > > > > > > >> > > > > > directly, we could create batch
>> > APIs
>> > > > to
>> > > > > > > solve
>> > > > > > > > > the
>> > > > > > > > > > > > naming
>> > > > > > > > > > > > > > > > >> problem.
>> > > > > > > > > > > > > > > > >> > For
>> > > > > > > > > > > > > > > > >> > > > > > example:
>> > > > > > > > > > > > > > > > >> > > > > >
>> > > > > > > > > > > > > > > > >> > > > > > Map<TopicPartition,
>> > OffsetAndEpoch>
>> > > > > > > > positions();
>> > > > > > > > > > > > > > > > >> > > > > > void seek(Map<TopicPartition,
>> > > > > > > OffsetAndEpoch>
>> > > > > > > > > > > > > positions);
>> > > > > > > > > > > > > > > > >> > > > > >
>> > > > > > > > > > > > > > > > >> > > > > > However, I'm actually leaning
>> > toward
>> > > > > > leaving
>> > > > > > > > the
>> > > > > > > > > > > > seek()
>> > > > > > > > > > > > > > and
>> > > > > > > > > > > > > > > > >> > > position()
>> > > > > > > > > > > > > > > > >> > > > > APIs
>> > > > > > > > > > > > > > > > >> > > > > > unchanged. Instead, we can add
>> a
>> > new
>> > > > API
>> > > > > > to
>> > > > > > > > > search
>> > > > > > > > > > > for
>> > > > > > > > > > > > > > > offset
>> > > > > > > > > > > > > > > > by
>> > > > > > > > > > > > > > > > >> > > > > timestamp
>> > > > > > > > > > > > > > > > >> > > > > > or by offset/leader epoch.
>> Let's
>> > say
>> > > > we
>> > > > > > call
>> > > > > > > > it
>> > > > > > > > > > > > > > > `findOffsets`.
>> > > > > > > > > > > > > > > > >> If
>> > > > > > > > > > > > > > > > >> > the
>> > > > > > > > > > > > > > > > >> > > > > user
>> > > > > > > > > > > > > > > > >> > > > > > hits a log truncation error,
>> they
>> > > can
>> > > > > use
>> > > > > > > this
>> > > > > > > > > API
>> > > > > > > > > > > to
>> > > > > > > > > > > > > find
>> > > > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > >> > > closest
>> > > > > > > > > > > > > > > > >> > > > > > offset and then do a seek(). At
>> > the
>> > > > same
>> > > > > > > time,
>> > > > > > > > > we
>> > > > > > > > > > > > > > deprecate
>> > > > > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > >> > > > > > `offsetsForTimes` APIs. We now
>> > have
>> > > > two
>> > > > > > use
>> > > > > > > > > cases
>> > > > > > > > > > > > which
>> > > > > > > > > > > > > > > > require
>> > > > > > > > > > > > > > > > >> > > finding
>> > > > > > > > > > > > > > > > >> > > > > > offsets, so I think we should
>> make
>> > > > this
>> > > > > > API
>> > > > > > > > > > general
>> > > > > > > > > > > > and
>> > > > > > > > > > > > > > > leave
>> > > > > > > > > > > > > > > > >> the
>> > > > > > > > > > > > > > > > >> > > door
>> > > > > > > > > > > > > > > > >> > > > > open
>> > > > > > > > > > > > > > > > >> > > > > > for future extensions.
>> > > > > > > > > > > > > > > > >> > > > > >
>> > > > > > > > > > > > > > > > >> > > > > > By the way, I'm unclear about
>> the
>> > > > desire
>> > > > > > to
>> > > > > > > > move
>> > > > > > > > > > > part
>> > > > > > > > > > > > of
>> > > > > > > > > > > > > > > this
>> > > > > > > > > > > > > > > > >> > > > > functionality
>> > > > > > > > > > > > > > > > >> > > > > > to AdminClient. Guozhang
>> suggested
>> > > > this
>> > > > > > > > > > previously,
>> > > > > > > > > > > > but
>> > > > > > > > > > > > > I
>> > > > > > > > > > > > > > > > think
>> > > > > > > > > > > > > > > > >> it
>> > > > > > > > > > > > > > > > >> > > only
>> > > > > > > > > > > > > > > > >> > > > > > makes sense for cross-cutting
>> > > > > capabilities
>> > > > > > > > such
>> > > > > > > > > as
>> > > > > > > > > > > > topic
>> > > > > > > > > > > > > > > > >> creation.
>> > > > > > > > > > > > > > > > >> > If
>> > > > > > > > > > > > > > > > >> > > > we
>> > > > > > > > > > > > > > > > >> > > > > > have an API which is primarily
>> > > useful
>> > > > by
>> > > > > > > > > > consumers,
>> > > > > > > > > > > > > then I
>> > > > > > > > > > > > > > > > think
>> > > > > > > > > > > > > > > > >> > > that's
>> > > > > > > > > > > > > > > > >> > > > > > where it should be exposed. The
>> > > > > > AdminClient
>> > > > > > > > also
>> > > > > > > > > > has
>> > > > > > > > > > > > its
>> > > > > > > > > > > > > > own
>> > > > > > > > > > > > > > > > API
>> > > > > > > > > > > > > > > > >> > > > > integrity
>> > > > > > > > > > > > > > > > >> > > > > > and should not become a dumping
>> > > ground
>> > > > > for
>> > > > > > > > > > advanced
>> > > > > > > > > > > > use
>> > > > > > > > > > > > > > > cases.
>> > > > > > > > > > > > > > > > >> I'll
>> > > > > > > > > > > > > > > > >> > > > > update
>> > > > > > > > > > > > > > > > >> > > > > > the KIP with the  `findOffsets`
>> > API
>> > > > > > > suggested
>> > > > > > > > > > above
>> > > > > > > > > > > > and
>> > > > > > > > > > > > > we
>> > > > > > > > > > > > > > > can
>> > > > > > > > > > > > > > > > >> see
>> > > > > > > > > > > > > > > > >> > if
>> > > > > > > > > > > > > > > > >> > > > it
>> > > > > > > > > > > > > > > > >> > > > > > does a good enough job of
>> keeping
>> > > the
>> > > > > API
>> > > > > > > > simple
>> > > > > > > > > > for
>> > > > > > > > > > > > > > common
>> > > > > > > > > > > > > > > > >> cases.
>> > > > > > > > > > > > > > > > >> > > > > >
>> > > > > > > > > > > > > > > > >> > > > > > Thanks,
>> > > > > > > > > > > > > > > > >> > > > > > Jason
>> > > > > > > > > > > > > > > > >> > > > > >
>> > > > > > > > > > > > > > > > >> > > > > >
>> > > > > > > > > > > > > > > > >> > > > > > On Sat, Jun 30, 2018 at 4:39
>> AM,
>> > > Dong
>> > > > > Lin
>> > > > > > <
>> > > > > > > > > > > > > > > > lindon...@gmail.com>
>> > > > > > > > > > > > > > > > >> > > wrote:
>> > > > > > > > > > > > > > > > >> > > > > >
>> > > > > > > > > > > > > > > > >> > > > > > > Hey Jason,
>> > > > > > > > > > > > > > > > >> > > > > > >
>> > > > > > > > > > > > > > > > >> > > > > > > Regarding seek(...), it seems
>> > that
>> > > > we
>> > > > > > want
>> > > > > > > > an
>> > > > > > > > > > API
>> > > > > > > > > > > > for
>> > > > > > > > > > > > > > user
>> > > > > > > > > > > > > > > > to
>> > > > > > > > > > > > > > > > >> > > > > initialize
>> > > > > > > > > > > > > > > > >> > > > > > > consumer with (offset,
>> > > leaderEpoch)
>> > > > > and
>> > > > > > > that
>> > > > > > > > > API
>> > > > > > > > > > > > > should
>> > > > > > > > > > > > > > > > allow
>> > > > > > > > > > > > > > > > >> > > > throwing
>> > > > > > > > > > > > > > > > >> > > > > > > PartitionTruncationException.
>> > > > Suppose
>> > > > > we
>> > > > > > > > agree
>> > > > > > > > > > on
>> > > > > > > > > > > > > this,
>> > > > > > > > > > > > > > > then
>> > > > > > > > > > > > > > > > >> > > > > > > seekToNearest() is not
>> > sufficient
>> > > > > > because
>> > > > > > > it
>> > > > > > > > > > will
>> > > > > > > > > > > > > always
>> > > > > > > > > > > > > > > > >> swallow
>> > > > > > > > > > > > > > > > >> > > > > > > PartitionTruncationException.
>> > Here
>> > > > we
>> > > > > > have
>> > > > > > > > two
>> > > > > > > > > > > > > options.
>> > > > > > > > > > > > > > > The
>> > > > > > > > > > > > > > > > >> first
>> > > > > > > > > > > > > > > > >> > > > > option
>> > > > > > > > > > > > > > > > >> > > > > > is
>> > > > > > > > > > > > > > > > >> > > > > > > to add API
>> > > offsetsForLeaderEpochs()
>> > > > to
>> > > > > > > > > translate
>> > > > > > > > > > > > > > > > (leaderEpoch,
>> > > > > > > > > > > > > > > > >> > > > offset)
>> > > > > > > > > > > > > > > > >> > > > > to
>> > > > > > > > > > > > > > > > >> > > > > > > offset. The second option is
>> to
>> > > have
>> > > > > add
>> > > > > > > > > > > > seek(offset,
>> > > > > > > > > > > > > > > > >> > leaderEpoch).
>> > > > > > > > > > > > > > > > >> > > > It
>> > > > > > > > > > > > > > > > >> > > > > > > seems that second option may
>> be
>> > > more
>> > > > > > > simpler
>> > > > > > > > > > > because
>> > > > > > > > > > > > > it
>> > > > > > > > > > > > > > > > makes
>> > > > > > > > > > > > > > > > >> it
>> > > > > > > > > > > > > > > > >> > > > clear
>> > > > > > > > > > > > > > > > >> > > > > > that
>> > > > > > > > > > > > > > > > >> > > > > > > (offset, leaderEpoch) will be
>> > used
>> > > > to
>> > > > > > > > identify
>> > > > > > > > > > > > > > consumer's
>> > > > > > > > > > > > > > > > >> > position
>> > > > > > > > > > > > > > > > >> > > > in a
>> > > > > > > > > > > > > > > > >> > > > > > > partition. And user only
>> needs
>> > to
>> > > > > handle
>> > > > > > > > > > > > > > > > >> > > PartitionTruncationException
>> > > > > > > > > > > > > > > > >> > > > > > from
>> > > > > > > > > > > > > > > > >> > > > > > > the poll(). In comparison the
>> > > first
>> > > > > > option
>> > > > > > > > > > seems a
>> > > > > > > > > > > > bit
>> > > > > > > > > > > > > > > > harder
>> > > > > > > > > > > > > > > > >> to
>> > > > > > > > > > > > > > > > >> > > use
>> > > > > > > > > > > > > > > > >> > > > > > > because user have to also
>> handle
>> > > the
>> > > > > > > > > > > > > > > > >> PartitionTruncationException
>> > > > > > > > > > > > > > > > >> > > if
>> > > > > > > > > > > > > > > > >> > > > > > > offsetsForLeaderEpochs()
>> returns
>> > > > > > different
>> > > > > > > > > > offset
>> > > > > > > > > > > > from
>> > > > > > > > > > > > > > > > >> > > user-provided
>> > > > > > > > > > > > > > > > >> > > > > > > offset. What do you think?
>> > > > > > > > > > > > > > > > >> > > > > > >
>> > > > > > > > > > > > > > > > >> > > > > > > If we decide to add API
>> > > seek(offset,
>> > > > > > > > > > leaderEpoch),
>> > > > > > > > > > > > > then
>> > > > > > > > > > > > > > we
>> > > > > > > > > > > > > > > > can
>> > > > > > > > > > > > > > > > >> > > decide
>> > > > > > > > > > > > > > > > >> > > > > > > whether and how to add API to
>> > > > > translate
>> > > > > > > > > (offset,
>> > > > > > > > > > > > > > > > leaderEpoch)
>> > > > > > > > > > > > > > > > >> to
>> > > > > > > > > > > > > > > > >> > > > > offset.
>> > > > > > > > > > > > > > > > >> > > > > > It
>> > > > > > > > > > > > > > > > >> > > > > > > seems that this API will be
>> > needed
>> > > > by
>> > > > > > > > advanced
>> > > > > > > > > > > user
>> > > > > > > > > > > > to
>> > > > > > > > > > > > > > > don't
>> > > > > > > > > > > > > > > > >> want
>> > > > > > > > > > > > > > > > >> > > > auto
>> > > > > > > > > > > > > > > > >> > > > > > > offset reset (so that it can
>> be
>> > > > > > notified)
>> > > > > > > > but
>> > > > > > > > > > > still
>> > > > > > > > > > > > > > wants
>> > > > > > > > > > > > > > > to
>> > > > > > > > > > > > > > > > >> > reset
>> > > > > > > > > > > > > > > > >> > > > > offset
>> > > > > > > > > > > > > > > > >> > > > > > > to closest. For those users
>> if
>> > > > > probably
>> > > > > > > > makes
>> > > > > > > > > > > sense
>> > > > > > > > > > > > to
>> > > > > > > > > > > > > > > only
>> > > > > > > > > > > > > > > > >> have
>> > > > > > > > > > > > > > > > >> > > the
>> > > > > > > > > > > > > > > > >> > > > > API
>> > > > > > > > > > > > > > > > >> > > > > > in
>> > > > > > > > > > > > > > > > >> > > > > > > AdminClient.
>> offsetsForTimes()
>> > > seems
>> > > > > > like
>> > > > > > > a
>> > > > > > > > > > common
>> > > > > > > > > > > > API
>> > > > > > > > > > > > > > > that
>> > > > > > > > > > > > > > > > >> will
>> > > > > > > > > > > > > > > > >> > be
>> > > > > > > > > > > > > > > > >> > > > > > needed
>> > > > > > > > > > > > > > > > >> > > > > > > by user's of consumer in
>> > general,
>> > > so
>> > > > > it
>> > > > > > > may
>> > > > > > > > be
>> > > > > > > > > > > more
>> > > > > > > > > > > > > > > > >> reasonable to
>> > > > > > > > > > > > > > > > >> > > > stay
>> > > > > > > > > > > > > > > > >> > > > > in
>> > > > > > > > > > > > > > > > >> > > > > > > the consumer API. I don't
>> have a
>> > > > > strong
>> > > > > > > > > opinion
>> > > > > > > > > > on
>> > > > > > > > > > > > > > whether
>> > > > > > > > > > > > > > > > >> > > > > > > offsetsForTimes() should be
>> > > replaced
>> > > > > by
>> > > > > > > API
>> > > > > > > > in
>> > > > > > > > > > > > > > > AdminClient.
>> > > > > > > > > > > > > > > > >> > > > > > >
>> > > > > > > > > > > > > > > > >> > > > > > > Though (offset, leaderEpoch)
>> is
>> > > > needed
>> > > > > > to
>> > > > > > > > > > uniquely
>> > > > > > > > > > > > > > > identify
>> > > > > > > > > > > > > > > > a
>> > > > > > > > > > > > > > > > >> > > message
>> > > > > > > > > > > > > > > > >> > > > > in
>> > > > > > > > > > > > > > > > >> > > > > > > general, it is only needed
>> for
>> > > > > advanced
>> > > > > > > > users
>> > > > > > > > > > who
>> > > > > > > > > > > > has
>> > > > > > > > > > > > > > > turned
>> > > > > > > > > > > > > > > > >> on
>> > > > > > > > > > > > > > > > >> > > > unclean
>> > > > > > > > > > > > > > > > >> > > > > > > leader election, need to use
>> > > > seek(..),
>> > > > > > and
>> > > > > > > > > don't
>> > > > > > > > > > > > want
>> > > > > > > > > > > > > > auto
>> > > > > > > > > > > > > > > > >> offset
>> > > > > > > > > > > > > > > > >> > > > > reset.
>> > > > > > > > > > > > > > > > >> > > > > > > Most other users probably
>> just
>> > > want
>> > > > to
>> > > > > > > > enable
>> > > > > > > > > > auto
>> > > > > > > > > > > > > > offset
>> > > > > > > > > > > > > > > > >> reset
>> > > > > > > > > > > > > > > > >> > and
>> > > > > > > > > > > > > > > > >> > > > > store
>> > > > > > > > > > > > > > > > >> > > > > > > offset in Kafka. Thus we
>> might
>> > > want
>> > > > to
>> > > > > > > keep
>> > > > > > > > > the
>> > > > > > > > > > > > > existing
>> > > > > > > > > > > > > > > > >> > > offset-only
>> > > > > > > > > > > > > > > > >> > > > > APIs
>> > > > > > > > > > > > > > > > >> > > > > > > (e.g. seek() and position())
>> for
>> > > > most
>> > > > > > > users
>> > > > > > > > > > while
>> > > > > > > > > > > > > adding
>> > > > > > > > > > > > > > > new
>> > > > > > > > > > > > > > > > >> APIs
>> > > > > > > > > > > > > > > > >> > > for
>> > > > > > > > > > > > > > > > >> > > > > > > advanced users. And yes, it
>> > seems
>> > > > that
>> > > > > > we
>> > > > > > > > need
>> > > > > > > > > > new
>> > > > > > > > > > > > > name
>> > > > > > > > > > > > > > > for
>> > > > > > > > > > > > > > > > >> > > > position().
>> > > > > > > > > > > > > > > > >> > > > > > >
>> > > > > > > > > > > > > > > > >> > > > > > > Though I think we need new
>> APIs
>> > to
>> > > > > carry
>> > > > > > > the
>> > > > > > > > > new
>> > > > > > > > > > > > > > > information
>> > > > > > > > > > > > > > > > >> > (e.g.
>> > > > > > > > > > > > > > > > >> > > > > > > leaderEpoch), I am not very
>> sure
>> > > how
>> > > > > > that
>> > > > > > > > > should
>> > > > > > > > > > > > look
>> > > > > > > > > > > > > > > like.
>> > > > > > > > > > > > > > > > >> One
>> > > > > > > > > > > > > > > > >> > > > > possible
>> > > > > > > > > > > > > > > > >> > > > > > > option is those APIs in
>> KIP-232.
>> > > > > Another
>> > > > > > > > > option
>> > > > > > > > > > is
>> > > > > > > > > > > > > > > something
>> > > > > > > > > > > > > > > > >> like
>> > > > > > > > > > > > > > > > >> > > > this:
>> > > > > > > > > > > > > > > > >> > > > > > >
>> > > > > > > > > > > > > > > > >> > > > > > > `````
>> > > > > > > > > > > > > > > > >> > > > > > > class OffsetEpochs {
>> > > > > > > > > > > > > > > > >> > > > > > >   long offset;
>> > > > > > > > > > > > > > > > >> > > > > > >   int leaderEpoch;
>> > > > > > > > > > > > > > > > >> > > > > > >   int partitionEpoch;   //
>> This
>> > > may
>> > > > be
>> > > > > > > > needed
>> > > > > > > > > > > later
>> > > > > > > > > > > > as
>> > > > > > > > > > > > > > > > >> discussed
>> > > > > > > > > > > > > > > > >> > in
>> > > > > > > > > > > > > > > > >> > > > > > KIP-232
>> > > > > > > > > > > > > > > > >> > > > > > >   ... // Hopefully these are
>> all
>> > > we
>> > > > > need
>> > > > > > > to
>> > > > > > > > > > > identify
>> > > > > > > > > > > > > > > message
>> > > > > > > > > > > > > > > > >> in
>> > > > > > > > > > > > > > > > >> > > > Kafka.
>> > > > > > > > > > > > > > > > >> > > > > > But
>> > > > > > > > > > > > > > > > >> > > > > > > if we need more then we can
>> add
>> > > new
>> > > > > > fields
>> > > > > > > > in
>> > > > > > > > > > this
>> > > > > > > > > > > > > > class.
>> > > > > > > > > > > > > > > > >> > > > > > > }
>> > > > > > > > > > > > > > > > >> > > > > > >
>> > > > > > > > > > > > > > > > >> > > > > > > OffsetEpochs
>> > > > > > offsetEpochs(TopicPartition);
>> > > > > > > > > > > > > > > > >> > > > > > >
>> > > > > > > > > > > > > > > > >> > > > > > > void seek(TopicPartition,
>> > > > > OffsetEpochs);
>> > > > > > > > > > > > > > > > >> > > > > > > ``````
>> > > > > > > > > > > > > > > > >> > > > > > >
>> > > > > > > > > > > > > > > > >> > > > > > >
>> > > > > > > > > > > > > > > > >> > > > > > > Thanks,
>> > > > > > > > > > > > > > > > >> > > > > > > Dong
>> > > > > > > > > > > > > > > > >> > > > > > >
>> > > > > > > > > > > > > > > > >> > > > > > >
>> > > > > > > > > > > > > > > > >> > > > > > > On Fri, Jun 29, 2018 at 11:13
>> > AM,
>> > > > > Jason
>> > > > > > > > > > Gustafson
>> > > > > > > > > > > <
>> > > > > > > > > > > > > > > > >> > > > ja...@confluent.io>
>> > > > > > > > > > > > > > > > >> > > > > > > wrote:
>> > > > > > > > > > > > > > > > >> > > > > > >
>> > > > > > > > > > > > > > > > >> > > > > > > > Hey Dong,
>> > > > > > > > > > > > > > > > >> > > > > > > >
>> > > > > > > > > > > > > > > > >> > > > > > > > Thanks for the feedback.
>> The
>> > > first
>> > > > > > three
>> > > > > > > > > > points
>> > > > > > > > > > > > are
>> > > > > > > > > > > > > > > easy:
>> > > > > > > > > > > > > > > > >> > > > > > > >
>> > > > > > > > > > > > > > > > >> > > > > > > > 1. Yes, we should be
>> > consistent.
>> > > > > > > > > > > > > > > > >> > > > > > > > 2. Yes, I will add this.
>> > > > > > > > > > > > > > > > >> > > > > > > > 3. Yes, I think we should
>> > > document
>> > > > > the
>> > > > > > > > > changes
>> > > > > > > > > > > to
>> > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > >> committed
>> > > > > > > > > > > > > > > > >> > > > > offset
>> > > > > > > > > > > > > > > > >> > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Reply via email to