Hey Dong,

Thanks for the feedback. The first three points are easy:

1. Yes, we should be consistent.
2. Yes, I will add this.
3. Yes, I think we should document the changes to the committed offset
schema. I meant to do this, but it slipped my mind.

The latter questions are tougher. One option I was considering is to have
only `offsetsForLeaderEpochs` exposed from the consumer and to drop the new
seek() API. That seems more consistent with the current use of
`offsetsForTimes` (we don't have a separate `seekToTimestamp` API). An
alternative might be to take a page from the AdminClient API and add a new
method to generalize offset lookup. For example, we could have
`lookupOffsets(LookupOptions)`. We could then deprecate `offsetsForTimes`
and this would open the door for future extensions without needing new APIs.

The case of position() is a little more annoying. It would have been better
had we let this return an object so that it is easier to extend. This is
the only reason I didn't add the API to the KIP. Maybe we should bite the
bullet and fix this now? Unfortunately we'll have to come up with a new
name. Maybe `currentPosition`?

Thoughts?

-Jason


On Fri, Jun 29, 2018 at 10:40 AM, Dong Lin <lindon...@gmail.com> wrote:

> Regarding points 4) and 5) above, motivation for the alternative APIs is
> that, if we decide that leaderEpoch is equally important as offset in
> identifying a message, then it may be reasonable to always specify it
> wherever offset is currently required in the consumer API to identify a
> message, e.g. position(), seek(). For example, since we allow user to
> retrieve offset using position() instead of asking user to keep track of
> the offset of the latest ConsumerRecord, may be it will be more consistent
> for user to also retrieve  leaderEpoch using position()?
>
>
>
> On Fri, Jun 29, 2018 at 10:30 AM, Dong Lin <lindon...@gmail.com> wrote:
>
> > Hey Jason,
> >
> > Thanks for the update. It looks pretty good. Just some minor comments
> > below:
> >
> > 1) The KIP adds new error code "LOG_TRUNCATION" and new exception
> TruncatedPartitionException.
> > Can we make the name more consistent, e.g. LogTruncationException?
> >
> > 2) Do we need to add UnknownLeaderEpochException as part of API change?
> >
> > 3) Not sure if the offset topic schema is also public API. If so, maybe
> we
> > should also include the schema change in the API?
> >
> > 4) For users who store offset externally, currently they get offset using
> > position(..), store the offset externally, and use seek(..) to initialize
> > the consumer next time. After this KIP they will need to store and use
> the
> > leaderEpoch together with the offset. Should we also update the API so
> that
> > user can also get leaderEpoch from position(...)? Not sure if it is OK to
> > ask user to track the latest leaderEpoch of ConsumerRecord by themselves.
> >
> > 5) Also for users who store offset externally, they need to call seek(..)
> > with leaderEpoch to initialize consumer. With current KIP users need to
> > call seekToNearest(), whose name suggests that the final position may be
> > different from what was requested. However, if users may want to avoid
> auto
> > offset reset and be notified explicitly when there is log truncation,
> then seekToNearest()
> > probably does not help here. Would it make sense to replace
> seekToNearest()
> > with seek(offset, leaderEpoch) + AminClient.offsetsForLeaderEpochs(...)?
> >
> >
> > Thanks,
> > Dong
> >
> >
> > On Wed, Jun 27, 2018 at 3:57 PM, Jason Gustafson <ja...@confluent.io>
> > wrote:
> >
> >> Hey Guozhang,
> >>
> >> That's fair. In fact, perhaps we do not need this API at all. We already
> >> have the new seek() in this KIP which can do the lookup based on epoch
> for
> >> this use case. I guess we should probably call it seekToNearest() though
> >> to
> >> make it clear that the final position may be different from what was
> >> requested.
> >>
> >> Thanks,
> >> Jason
> >>
> >> On Wed, Jun 27, 2018 at 3:20 PM, Guozhang Wang <wangg...@gmail.com>
> >> wrote:
> >>
> >> > Hi Jason,
> >> >
> >> > I think it is less worthwhile to add KafkaConsumer#offsetsForLeader
> >> Epochs,
> >> > since probably only very advanced users are aware of the leaderEpoch,
> >> and
> >> > hence ever care to use it anyways. It is more like an admin client
> >> > operation than a consumer client operation: if the motivation is to
> >> > facility customized reset policy, maybe adding it as
> >> > AdminClient#offsetsForLeaderEpochs
> >> > is better as it is not an aggressive assumption that for such advanced
> >> > users they are willing to use some admin client to get further
> >> information?
> >> >
> >> >
> >> > Guozhang
> >> >
> >> >
> >> > On Wed, Jun 27, 2018 at 2:07 PM, Jason Gustafson <ja...@confluent.io>
> >> > wrote:
> >> >
> >> > > Thanks for the feedback. I've updated the KIP. Specifically I
> removed
> >> the
> >> > > "closest" reset option and the proposal to reset by timestamp when
> the
> >> > > precise truncation point cannot be determined. Instead, I proposed
> >> that
> >> > we
> >> > > always reset using the nearest epoch when a reset policy is defined
> >> > (either
> >> > > "earliest" or "latest"). Does that sound reasonable?
> >> > >
> >> > > One thing I am still debating is whether it would be better to have
> a
> >> > > separate API to find the closest offset using the leader epoch. In
> the
> >> > > current KIP, I suggested to piggyback this information on an
> >> exception,
> >> > but
> >> > > I'm beginning to think it would be better not to hide the lookup. It
> >> is
> >> > > awkward to implement since it means delaying the exception and the
> API
> >> > may
> >> > > actually be useful when customizing reset logic if no auto reset
> >> policy
> >> > is
> >> > > defined. I was thinking we can add an API like the following:
> >> > >
> >> > > Map<TopicPartition, OffsetAndEpoch>
> >> > > offsetsForLeaderEpochs(Map<TopicPartition, Integer> epochsToSearch)
> >> > >
> >> > > Thoughts?
> >> > >
> >> > > -Jason
> >> > >
> >> > >
> >> > >
> >> > >
> >> > > On Wed, Jun 27, 2018 at 11:12 AM, Jason Gustafson <
> ja...@confluent.io
> >> >
> >> > > wrote:
> >> > >
> >> > > > @Dong
> >> > > >
> >> > > > Those are fair points. Both approaches require some fuzziness to
> >> reset
> >> > > the
> >> > > > offset in these pathological scenarios and we cannot guarantee
> >> > > > at-least-once delivery either way unless we have the full history
> of
> >> > > leader
> >> > > > epochs that were consumed. The KIP-101 logic may actually be more
> >> > > accurate
> >> > > > than using timestamps because it does not depend on the messages
> >> which
> >> > > are
> >> > > > written after the unclean leader election. The case we're talking
> >> about
> >> > > > should be extremely rare in practice anyway. I also agree that we
> >> may
> >> > not
> >> > > > want to add new machinery if it only helps the old message format.
> >> Ok,
> >> > > > let's go ahead and drop the timestamp.
> >> > > >
> >> > > > @Guozhang
> >> > > >
> >> > > > * My current understanding is that, with unclean leader election
> >> turned
> >> > > on,
> >> > > >> exactly-once is out of the window since we cannot guarantee that
> >> all
> >> > > >> committed message markers will not be lost. And hence there is no
> >> need
> >> > > to
> >> > > >> have special handling logic for LOG_TRUNCATED or OOR error codes
> >> with
> >> > > >> read.committed turned on. Is that right?
> >> > > >
> >> > > >
> >> > > > Yes, that's right. EoS and unclean leader election don't mix well.
> >> It
> >> > may
> >> > > > be worth considering separately whether we should try to reconcile
> >> the
> >> > > > transaction log following an unclean leader election. At least we
> >> may
> >> > be
> >> > > > able to prevent dangling transactions from blocking consumers.
> This
> >> KIP
> >> > > > does not address this problem.
> >> > > >
> >> > > > * MINOR: "if the epoch is greater than the minimum expected epoch,
> >> that
> >> > > the
> >> > > >> new epoch does not begin at an earlier offset than the fetch
> >> offset.
> >> > In
> >> > > >> the latter case, the leader can respond with a new LOG_TRUNCATION
> >> > error
> >> > > >> code" should it be "does not begin at a later offset than the
> fetch
> >> > > >> offset"?
> >> > > >
> >> > > >
> >> > > > I think the comment is correct, though the phrasing may be
> >> confusing.
> >> > We
> >> > > > know truncation has occurred if there exists a larger epoch with a
> >> > > starting
> >> > > > offset that is lower than the fetch offset. Let me try to rephrase
> >> > this.
> >> > > >
> >> > > > Thanks,
> >> > > > Jason
> >> > > >
> >> > > > On Wed, Jun 27, 2018 at 9:17 AM, Guozhang Wang <
> wangg...@gmail.com>
> >> > > wrote:
> >> > > >
> >> > > >> Jason, thanks for the KIP. A few comments:
> >> > > >>
> >> > > >> * I think Dong's question about whether to use timestamp-based
> >> > approach
> >> > > >> v.s. start-offset-of-first-larger-epoch is valid; more
> >> specifically,
> >> > > with
> >> > > >> timestamp-based approach we may still be reseting to an offset
> >> falling
> >> > > >> into
> >> > > >> the truncated interval, and hence we may still miss some data,
> i.e.
> >> > not
> >> > > >> guaranteeing at-least-once still. With the
> >> > > >> start-offset-of-first-larger-epoch, I'm not sure if it will
> >> guarantee
> >> > > no
> >> > > >> valid data is missed when we have consecutive log truncations
> >> (maybe
> >> > we
> >> > > >> need to look back into details of KIP-101 to figure it out). If
> the
> >> > > latter
> >> > > >> can indeed guarantee at least once, we could consider using that
> >> > > approach.
> >> > > >>
> >> > > >> * My current understanding is that, with unclean leader election
> >> > turned
> >> > > >> on,
> >> > > >> exactly-once is out of the window since we cannot guarantee that
> >> all
> >> > > >> committed message markers will not be lost. And hence there is no
> >> need
> >> > > to
> >> > > >> have special handling logic for LOG_TRUNCATED or OOR error codes
> >> with
> >> > > >> read.committed turned on. Is that right?
> >> > > >>
> >> > > >> * MINOR: "if the epoch is greater than the minimum expected
> epoch,
> >> > that
> >> > > >> the
> >> > > >> new epoch does not begin at an earlier offset than the fetch
> >> offset.
> >> > In
> >> > > >> the latter case, the leader can respond with a new LOG_TRUNCATION
> >> > error
> >> > > >> code" should it be "does not begin at a later offset than the
> fetch
> >> > > >> offset"?
> >> > > >>
> >> > > >>
> >> > > >>
> >> > > >> Guozhang
> >> > > >>
> >> > > >>
> >> > > >> On Tue, Jun 26, 2018 at 6:51 PM, Dong Lin <lindon...@gmail.com>
> >> > wrote:
> >> > > >>
> >> > > >> > Hey Jason,
> >> > > >> >
> >> > > >> > Thanks for the explanation.
> >> > > >> >
> >> > > >> > Please correct me if this is wrong. The "unknown truncation
> >> offset"
> >> > > >> > scenario happens when consumer does not have the full
> >> leaderEpoch ->
> >> > > >> offset
> >> > > >> > mapping. In this case we can still use the KIP-101-based
> >> approach to
> >> > > >> > truncate offset to "start offset of the first Leader Epoch
> larger
> >> > than
> >> > > >> last
> >> > > >> > epoch of the consumer" but it may be inaccurate. So the KIP
> >> chooses
> >> > to
> >> > > >> use
> >> > > >> > the timestamp-based approach which is also best-effort.
> >> > > >> >
> >> > > >> > If this understanding is correct, for "closest" offset reset
> >> policy
> >> > > and
> >> > > >> > "unknown truncation offset" scenario, I am wondering whether it
> >> > maybe
> >> > > >> > better to replace timestamp-based approach with KIP-101 based
> >> > > approach.
> >> > > >> In
> >> > > >> > comparison to timestamp-based approach, the KIP-101-based
> >> approach
> >> > > >> seems to
> >> > > >> > simplify the API a bit since user does not need to understand
> >> > > timestamp.
> >> > > >> > Similar to the timestamp-based approach, both approaches are
> >> > > best-effort
> >> > > >> > and do not guarantee that consumer can consume all messages. It
> >> is
> >> > not
> >> > > >> like
> >> > > >> > KIP-279 which guarantees that follower broker can consume all
> >> > messages
> >> > > >> from
> >> > > >> > the leader.
> >> > > >> >
> >> > > >> > Then it seems that the remaining difference is mostly about
> >> > accuracy,
> >> > > >> i.e.
> >> > > >> > how much message will be duplicated or missed in the "unknown
> >> > > truncation
> >> > > >> > offset" scenario. Not sure either one is clearly better than
> the
> >> > > other.
> >> > > >> > Note that there are two scenarios mentioned in KIP-279 which
> are
> >> not
> >> > > >> > addressed by KIP-101. Both scenarios require quick leadership
> >> change
> >> > > >> > between brokers, which seems to suggest that the offset based
> >> > obtained
> >> > > >> > by "start
> >> > > >> > offset of the first Leader Epoch larger than last epoch of the
> >> > > consumer"
> >> > > >> > under these two scenarios may be very close to the offset
> >> obtained
> >> > by
> >> > > >> the
> >> > > >> > message timestamp. Does this sound reasonable?
> >> > > >> >
> >> > > >> > Good point that users on v1 format can get benefit with
> timestamp
> >> > > based
> >> > > >> > approach. On the other hand it seems like a short term benefit
> >> for
> >> > > users
> >> > > >> > who have not migrated. I am just not sure whether it is more
> >> > important
> >> > > >> than
> >> > > >> > designing a better API.
> >> > > >> >
> >> > > >> > Also, for both "latest" and "earliest" reset policy, do you
> >> think it
> >> > > >> would
> >> > > >> > make sense to also use the KIP-101 based approach to truncate
> >> offset
> >> > > for
> >> > > >> > the "unknown truncation offset" scenario?
> >> > > >> >
> >> > > >> >
> >> > > >> > Thanks,
> >> > > >> > Dong
> >> > > >> >
> >> > > >>
> >> > > >>
> >> > > >>
> >> > > >> --
> >> > > >> -- Guozhang
> >> > > >>
> >> > > >
> >> > > >
> >> > >
> >> >
> >> >
> >> >
> >> > --
> >> > -- Guozhang
> >> >
> >>
> >
> >
>

Reply via email to