Hey Anna,

Thanks much for the thoughtful reply. It makes sense to different between
"seeking to a message" and "seeking to a position". I have to questions
here:

- For "seeking to a message" use-case, with the proposed approach user
needs to call findOffset(offset, leaderEpoch) followed by seek(offset). If
message truncation and message append happen immediately after
findOffset(offset,
leaderEpoch) but before seek(offset), it seems that user will seek to the
wrong message without knowing the truncation has happened. Would this be a
problem?

- For "seeking to a position" use-case, it seems that there can be two
positions, i.e. earliest and latest. So these two cases can be
Consumer.fulfilled by seekToBeginning() and Consumer.seekToEnd(). Then it
seems that user will only need to call position() and seek() for "seeking
to a message" use-case?

Thanks,
Dong


On Wed, Jul 4, 2018 at 12:33 PM, Anna Povzner <a...@confluent.io> wrote:

> Hi Jason and Dong,
>
>
> I’ve been thinking about your suggestions and discussion regarding
> position(), seek(), and new proposed API.
>
>
> Here is my thought process why we should keep position() and seek() API
> unchanged.
>
>
> I think we should separate {offset, leader epoch} that uniquely identifies
> a message from an offset that is a position. In some cases, offsets
> returned from position() could be actual consumed messages by this consumer
> identified by {offset, leader epoch}. In other cases, position() returns
> offset that was not actually consumed. Suppose, the user calls position()
> for the last offset. Suppose we return {offset, leader epoch} of the
> message currently in the log. Then, the message gets truncated before
> consumer’s first poll(). It does not make sense for poll() to fail in this
> case, because the log truncation did not actually happen from the consumer
> perspective. On the other hand, as the KIP proposes, it makes sense for the
> committed() method to return {offset, leader epoch} because those offsets
> represent actual consumed messages.
>
>
> The same argument applies to the seek() method — we are not seeking to a
> message, we are seeking to a position.
>
>
> I like the proposal to add KafkaConsumer#findOffsets() API. I am assuming
> something like:
>
> Map<TopicPartition, Long> findOffsets(Map<TopicPartition, OffsetAndEpoch>
> offsetsToSearch)
>
> Similar to seek() and position(), I think findOffsets() should return
> offset without leader epoch, because what we want is the offset that we
> think is closest to the not divergent message from the given consumed
> message. Until the consumer actually fetches the message, we should not let
> the consumer store the leader epoch for a message it did not consume.
>
>
> So, the workflow will be:
>
> 1) The user gets LogTruncationException with {offset, leader epoch of the
> previous message} (whatever we send with new FetchRecords request).
>
> 2) offset = findOffsets(tp -> {offset, leader epoch})
>
> 3) seek(offset)
>
>
> For the use-case where the users store committed offsets externally:
>
> 1) Such users would have to track the leader epoch together with an offset.
> Otherwise, there is no way to detect later what leader epoch was associated
> with the message. I think it’s reasonable to ask that from users if they
> want to detect log truncation. Otherwise, they will get the current
> behavior.
>
>
> If the users currently get an offset to be stored using position(), I see
> two possibilities. First, they call save offset returned from position()
> that they call before poll(). In that case, it would not be correct to
> store {offset, leader epoch} if we would have changed position() to return
> {offset, leader epoch} since actual fetched message could be different
> (from the example I described earlier). So, it would be more correct to
> call position() after poll(). However, the user already gets
> ConsumerRecords at this point, from which the user can extract {offset,
> leader epoch} of the last message.
>
>
> So, I like the idea of adding a helper method to ConsumerRecords, as Jason
> proposed, something like:
>
> public OffsetAndEpoch lastOffsetWithLeaderEpoch(), where OffsetAndEpoch is
> a data struct holding {offset, leader epoch}.
>
>
> In this case, we would advise the user to follow the workflow: poll(), get
> {offset, leader epoch} from ConsumerRecords#lastOffsetWithLeaderEpoch(),
> save offset and leader epoch, process records.
>
>
> 2) When the user needs to seek to the last committed offset, they call new
> findOffsets(saved offset, leader epoch), and then seek(offset).
>
>
> What do you think?
>
>
> Thanks,
>
> Anna
>
>
> On Tue, Jul 3, 2018 at 4:06 PM Dong Lin <lindon...@gmail.com> wrote:
>
> > Hey Jason,
> >
> > Thanks much for your thoughtful explanation.
> >
> > Yes the solution using findOffsets(offset, leaderEpoch) also works. The
> > advantage of this solution it adds only one API instead of two APIs. The
> > concern is that its usage seems a bit more clumsy for advanced users.
> More
> > specifically, advanced users who store offsets externally will always
> need
> > to call findOffsets() before calling seek(offset) during consumer
> > initialization. And those advanced users will need to manually keep track
> > of the leaderEpoch of the last ConsumerRecord.
> >
> > The other solution may be more user-friendly for advanced users is to add
> > two APIs, `void seek(offset, leaderEpoch)` and `(offset, epoch) =
> > offsetEpochs(topicPartition)`.
> >
> > I kind of prefer the second solution because it is easier to use for
> > advanced users. If we need to expose leaderEpoch anyway to safely
> identify
> > a message, it may be conceptually simpler to expose it directly in
> > seek(...) rather than requiring one more translation using
> > findOffsets(...). But I am also OK with the first solution if other
> > developers also favor that one :)
> >
> > Thanks,
> > Dong
> >
> >
> > On Mon, Jul 2, 2018 at 11:10 AM, Jason Gustafson <ja...@confluent.io>
> > wrote:
> >
> > > Hi Dong,
> > >
> > > Thanks, I've been thinking about your suggestions a bit. It is
> > challenging
> > > to make this work given the current APIs. One of the difficulties is
> that
> > > we don't have an API to find the leader epoch for a given offset at the
> > > moment. So if the user does a seek to offset 5, then we'll need a new
> API
> > > to find the corresponding epoch in order to fulfill the new position()
> > API.
> > > Potentially we could modify ListOffsets to enable finding the leader
> > epoch,
> > > but I am not sure it is worthwhile. Perhaps it is reasonable for
> advanced
> > > usage to expect that the epoch information, if needed, will be
> extracted
> > > from the records directly? It might make sense to expose a helper in
> > > `ConsumerRecords` to make this a little easier though.
> > >
> > > Alternatively, if we think it is important to have this information
> > exposed
> > > directly, we could create batch APIs to solve the naming problem. For
> > > example:
> > >
> > > Map<TopicPartition, OffsetAndEpoch> positions();
> > > void seek(Map<TopicPartition, OffsetAndEpoch> positions);
> > >
> > > However, I'm actually leaning toward leaving the seek() and position()
> > APIs
> > > unchanged. Instead, we can add a new API to search for offset by
> > timestamp
> > > or by offset/leader epoch. Let's say we call it `findOffsets`. If the
> > user
> > > hits a log truncation error, they can use this API to find the closest
> > > offset and then do a seek(). At the same time, we deprecate the
> > > `offsetsForTimes` APIs. We now have two use cases which require finding
> > > offsets, so I think we should make this API general and leave the door
> > open
> > > for future extensions.
> > >
> > > By the way, I'm unclear about the desire to move part of this
> > functionality
> > > to AdminClient. Guozhang suggested this previously, but I think it only
> > > makes sense for cross-cutting capabilities such as topic creation. If
> we
> > > have an API which is primarily useful by consumers, then I think that's
> > > where it should be exposed. The AdminClient also has its own API
> > integrity
> > > and should not become a dumping ground for advanced use cases. I'll
> > update
> > > the KIP with the  `findOffsets` API suggested above and we can see if
> it
> > > does a good enough job of keeping the API simple for common cases.
> > >
> > > Thanks,
> > > Jason
> > >
> > >
> > > On Sat, Jun 30, 2018 at 4:39 AM, Dong Lin <lindon...@gmail.com> wrote:
> > >
> > > > Hey Jason,
> > > >
> > > > Regarding seek(...), it seems that we want an API for user to
> > initialize
> > > > consumer with (offset, leaderEpoch) and that API should allow
> throwing
> > > > PartitionTruncationException. Suppose we agree on this, then
> > > > seekToNearest() is not sufficient because it will always swallow
> > > > PartitionTruncationException. Here we have two options. The first
> > option
> > > is
> > > > to add API offsetsForLeaderEpochs() to translate (leaderEpoch,
> offset)
> > to
> > > > offset. The second option is to have add seek(offset, leaderEpoch).
> It
> > > > seems that second option may be more simpler because it makes it
> clear
> > > that
> > > > (offset, leaderEpoch) will be used to identify consumer's position
> in a
> > > > partition. And user only needs to handle PartitionTruncationException
> > > from
> > > > the poll(). In comparison the first option seems a bit harder to use
> > > > because user have to also handle the PartitionTruncationException if
> > > > offsetsForLeaderEpochs() returns different offset from user-provided
> > > > offset. What do you think?
> > > >
> > > > If we decide to add API seek(offset, leaderEpoch), then we can decide
> > > > whether and how to add API to translate (offset, leaderEpoch) to
> > offset.
> > > It
> > > > seems that this API will be needed by advanced user to don't want
> auto
> > > > offset reset (so that it can be notified) but still wants to reset
> > offset
> > > > to closest. For those users if probably makes sense to only have the
> > API
> > > in
> > > > AdminClient. offsetsForTimes() seems like a common API that will be
> > > needed
> > > > by user's of consumer in general, so it may be more reasonable to
> stay
> > in
> > > > the consumer API. I don't have a strong opinion on whether
> > > > offsetsForTimes() should be replaced by API in AdminClient.
> > > >
> > > > Though (offset, leaderEpoch) is needed to uniquely identify a message
> > in
> > > > general, it is only needed for advanced users who has turned on
> unclean
> > > > leader election, need to use seek(..), and don't want auto offset
> > reset.
> > > > Most other users probably just want to enable auto offset reset and
> > store
> > > > offset in Kafka. Thus we might want to keep the existing offset-only
> > APIs
> > > > (e.g. seek() and position()) for most users while adding new APIs for
> > > > advanced users. And yes, it seems that we need new name for
> position().
> > > >
> > > > Though I think we need new APIs to carry the new information (e.g.
> > > > leaderEpoch), I am not very sure how that should look like. One
> > possible
> > > > option is those APIs in KIP-232. Another option is something like
> this:
> > > >
> > > > `````
> > > > class OffsetEpochs {
> > > >   long offset;
> > > >   int leaderEpoch;
> > > >   int partitionEpoch;   // This may be needed later as discussed in
> > > KIP-232
> > > >   ... // Hopefully these are all we need to identify message in
> Kafka.
> > > But
> > > > if we need more then we can add new fields in this class.
> > > > }
> > > >
> > > > OffsetEpochs offsetEpochs(TopicPartition);
> > > >
> > > > void seek(TopicPartition, OffsetEpochs);
> > > > ``````
> > > >
> > > >
> > > > Thanks,
> > > > Dong
> > > >
> > > >
> > > > On Fri, Jun 29, 2018 at 11:13 AM, Jason Gustafson <
> ja...@confluent.io>
> > > > wrote:
> > > >
> > > > > Hey Dong,
> > > > >
> > > > > Thanks for the feedback. The first three points are easy:
> > > > >
> > > > > 1. Yes, we should be consistent.
> > > > > 2. Yes, I will add this.
> > > > > 3. Yes, I think we should document the changes to the committed
> > offset
> > > > > schema. I meant to do this, but it slipped my mind.
> > > > >
> > > > > The latter questions are tougher. One option I was considering is
> to
> > > have
> > > > > only `offsetsForLeaderEpochs` exposed from the consumer and to drop
> > the
> > > > new
> > > > > seek() API. That seems more consistent with the current use of
> > > > > `offsetsForTimes` (we don't have a separate `seekToTimestamp` API).
> > An
> > > > > alternative might be to take a page from the AdminClient API and
> add
> > a
> > > > new
> > > > > method to generalize offset lookup. For example, we could have
> > > > > `lookupOffsets(LookupOptions)`. We could then deprecate
> > > > `offsetsForTimes`
> > > > > and this would open the door for future extensions without needing
> > new
> > > > > APIs.
> > > > >
> > > > > The case of position() is a little more annoying. It would have
> been
> > > > better
> > > > > had we let this return an object so that it is easier to extend.
> This
> > > is
> > > > > the only reason I didn't add the API to the KIP. Maybe we should
> bite
> > > the
> > > > > bullet and fix this now? Unfortunately we'll have to come up with a
> > new
> > > > > name. Maybe `currentPosition`?
> > > > >
> > > > > Thoughts?
> > > > >
> > > > > -Jason
> > > > >
> > > > >
> > > > > On Fri, Jun 29, 2018 at 10:40 AM, Dong Lin <lindon...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Regarding points 4) and 5) above, motivation for the alternative
> > APIs
> > > > is
> > > > > > that, if we decide that leaderEpoch is equally important as
> offset
> > in
> > > > > > identifying a message, then it may be reasonable to always
> specify
> > it
> > > > > > wherever offset is currently required in the consumer API to
> > > identify a
> > > > > > message, e.g. position(), seek(). For example, since we allow
> user
> > to
> > > > > > retrieve offset using position() instead of asking user to keep
> > track
> > > > of
> > > > > > the offset of the latest ConsumerRecord, may be it will be more
> > > > > consistent
> > > > > > for user to also retrieve  leaderEpoch using position()?
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Fri, Jun 29, 2018 at 10:30 AM, Dong Lin <lindon...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > > > Hey Jason,
> > > > > > >
> > > > > > > Thanks for the update. It looks pretty good. Just some minor
> > > comments
> > > > > > > below:
> > > > > > >
> > > > > > > 1) The KIP adds new error code "LOG_TRUNCATION" and new
> exception
> > > > > > TruncatedPartitionException.
> > > > > > > Can we make the name more consistent, e.g.
> > LogTruncationException?
> > > > > > >
> > > > > > > 2) Do we need to add UnknownLeaderEpochException as part of API
> > > > change?
> > > > > > >
> > > > > > > 3) Not sure if the offset topic schema is also public API. If
> so,
> > > > maybe
> > > > > > we
> > > > > > > should also include the schema change in the API?
> > > > > > >
> > > > > > > 4) For users who store offset externally, currently they get
> > offset
> > > > > using
> > > > > > > position(..), store the offset externally, and use seek(..) to
> > > > > initialize
> > > > > > > the consumer next time. After this KIP they will need to store
> > and
> > > > use
> > > > > > the
> > > > > > > leaderEpoch together with the offset. Should we also update the
> > API
> > > > so
> > > > > > that
> > > > > > > user can also get leaderEpoch from position(...)? Not sure if
> it
> > is
> > > > OK
> > > > > to
> > > > > > > ask user to track the latest leaderEpoch of ConsumerRecord by
> > > > > themselves.
> > > > > > >
> > > > > > > 5) Also for users who store offset externally, they need to
> call
> > > > > seek(..)
> > > > > > > with leaderEpoch to initialize consumer. With current KIP users
> > > need
> > > > to
> > > > > > > call seekToNearest(), whose name suggests that the final
> position
> > > may
> > > > > be
> > > > > > > different from what was requested. However, if users may want
> to
> > > > avoid
> > > > > > auto
> > > > > > > offset reset and be notified explicitly when there is log
> > > truncation,
> > > > > > then seekToNearest()
> > > > > > > probably does not help here. Would it make sense to replace
> > > > > > seekToNearest()
> > > > > > > with seek(offset, leaderEpoch) + AminClient.
> > > > > offsetsForLeaderEpochs(...)?
> > > > > > >
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Dong
> > > > > > >
> > > > > > >
> > > > > > > On Wed, Jun 27, 2018 at 3:57 PM, Jason Gustafson <
> > > ja...@confluent.io
> > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > >> Hey Guozhang,
> > > > > > >>
> > > > > > >> That's fair. In fact, perhaps we do not need this API at all.
> We
> > > > > already
> > > > > > >> have the new seek() in this KIP which can do the lookup based
> on
> > > > epoch
> > > > > > for
> > > > > > >> this use case. I guess we should probably call it
> > seekToNearest()
> > > > > though
> > > > > > >> to
> > > > > > >> make it clear that the final position may be different from
> what
> > > was
> > > > > > >> requested.
> > > > > > >>
> > > > > > >> Thanks,
> > > > > > >> Jason
> > > > > > >>
> > > > > > >> On Wed, Jun 27, 2018 at 3:20 PM, Guozhang Wang <
> > > wangg...@gmail.com>
> > > > > > >> wrote:
> > > > > > >>
> > > > > > >> > Hi Jason,
> > > > > > >> >
> > > > > > >> > I think it is less worthwhile to add
> > > > KafkaConsumer#offsetsForLeader
> > > > > > >> Epochs,
> > > > > > >> > since probably only very advanced users are aware of the
> > > > > leaderEpoch,
> > > > > > >> and
> > > > > > >> > hence ever care to use it anyways. It is more like an admin
> > > client
> > > > > > >> > operation than a consumer client operation: if the
> motivation
> > is
> > > > to
> > > > > > >> > facility customized reset policy, maybe adding it as
> > > > > > >> > AdminClient#offsetsForLeaderEpochs
> > > > > > >> > is better as it is not an aggressive assumption that for
> such
> > > > > advanced
> > > > > > >> > users they are willing to use some admin client to get
> further
> > > > > > >> information?
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > Guozhang
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > On Wed, Jun 27, 2018 at 2:07 PM, Jason Gustafson <
> > > > > ja...@confluent.io>
> > > > > > >> > wrote:
> > > > > > >> >
> > > > > > >> > > Thanks for the feedback. I've updated the KIP.
> Specifically
> > I
> > > > > > removed
> > > > > > >> the
> > > > > > >> > > "closest" reset option and the proposal to reset by
> > timestamp
> > > > when
> > > > > > the
> > > > > > >> > > precise truncation point cannot be determined. Instead, I
> > > > proposed
> > > > > > >> that
> > > > > > >> > we
> > > > > > >> > > always reset using the nearest epoch when a reset policy
> is
> > > > > defined
> > > > > > >> > (either
> > > > > > >> > > "earliest" or "latest"). Does that sound reasonable?
> > > > > > >> > >
> > > > > > >> > > One thing I am still debating is whether it would be
> better
> > to
> > > > > have
> > > > > > a
> > > > > > >> > > separate API to find the closest offset using the leader
> > > epoch.
> > > > In
> > > > > > the
> > > > > > >> > > current KIP, I suggested to piggyback this information on
> an
> > > > > > >> exception,
> > > > > > >> > but
> > > > > > >> > > I'm beginning to think it would be better not to hide the
> > > > lookup.
> > > > > It
> > > > > > >> is
> > > > > > >> > > awkward to implement since it means delaying the exception
> > and
> > > > the
> > > > > > API
> > > > > > >> > may
> > > > > > >> > > actually be useful when customizing reset logic if no auto
> > > reset
> > > > > > >> policy
> > > > > > >> > is
> > > > > > >> > > defined. I was thinking we can add an API like the
> > following:
> > > > > > >> > >
> > > > > > >> > > Map<TopicPartition, OffsetAndEpoch>
> > > > > > >> > > offsetsForLeaderEpochs(Map<TopicPartition, Integer>
> > > > > epochsToSearch)
> > > > > > >> > >
> > > > > > >> > > Thoughts?
> > > > > > >> > >
> > > > > > >> > > -Jason
> > > > > > >> > >
> > > > > > >> > >
> > > > > > >> > >
> > > > > > >> > >
> > > > > > >> > > On Wed, Jun 27, 2018 at 11:12 AM, Jason Gustafson <
> > > > > > ja...@confluent.io
> > > > > > >> >
> > > > > > >> > > wrote:
> > > > > > >> > >
> > > > > > >> > > > @Dong
> > > > > > >> > > >
> > > > > > >> > > > Those are fair points. Both approaches require some
> > > fuzziness
> > > > to
> > > > > > >> reset
> > > > > > >> > > the
> > > > > > >> > > > offset in these pathological scenarios and we cannot
> > > guarantee
> > > > > > >> > > > at-least-once delivery either way unless we have the
> full
> > > > > history
> > > > > > of
> > > > > > >> > > leader
> > > > > > >> > > > epochs that were consumed. The KIP-101 logic may
> actually
> > be
> > > > > more
> > > > > > >> > > accurate
> > > > > > >> > > > than using timestamps because it does not depend on the
> > > > messages
> > > > > > >> which
> > > > > > >> > > are
> > > > > > >> > > > written after the unclean leader election. The case
> we're
> > > > > talking
> > > > > > >> about
> > > > > > >> > > > should be extremely rare in practice anyway. I also
> agree
> > > that
> > > > > we
> > > > > > >> may
> > > > > > >> > not
> > > > > > >> > > > want to add new machinery if it only helps the old
> message
> > > > > format.
> > > > > > >> Ok,
> > > > > > >> > > > let's go ahead and drop the timestamp.
> > > > > > >> > > >
> > > > > > >> > > > @Guozhang
> > > > > > >> > > >
> > > > > > >> > > > * My current understanding is that, with unclean leader
> > > > election
> > > > > > >> turned
> > > > > > >> > > on,
> > > > > > >> > > >> exactly-once is out of the window since we cannot
> > guarantee
> > > > > that
> > > > > > >> all
> > > > > > >> > > >> committed message markers will not be lost. And hence
> > there
> > > > is
> > > > > no
> > > > > > >> need
> > > > > > >> > > to
> > > > > > >> > > >> have special handling logic for LOG_TRUNCATED or OOR
> > error
> > > > > codes
> > > > > > >> with
> > > > > > >> > > >> read.committed turned on. Is that right?
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > > > Yes, that's right. EoS and unclean leader election don't
> > mix
> > > > > well.
> > > > > > >> It
> > > > > > >> > may
> > > > > > >> > > > be worth considering separately whether we should try to
> > > > > reconcile
> > > > > > >> the
> > > > > > >> > > > transaction log following an unclean leader election. At
> > > least
> > > > > we
> > > > > > >> may
> > > > > > >> > be
> > > > > > >> > > > able to prevent dangling transactions from blocking
> > > consumers.
> > > > > > This
> > > > > > >> KIP
> > > > > > >> > > > does not address this problem.
> > > > > > >> > > >
> > > > > > >> > > > * MINOR: "if the epoch is greater than the minimum
> > expected
> > > > > epoch,
> > > > > > >> that
> > > > > > >> > > the
> > > > > > >> > > >> new epoch does not begin at an earlier offset than the
> > > fetch
> > > > > > >> offset.
> > > > > > >> > In
> > > > > > >> > > >> the latter case, the leader can respond with a new
> > > > > LOG_TRUNCATION
> > > > > > >> > error
> > > > > > >> > > >> code" should it be "does not begin at a later offset
> than
> > > the
> > > > > > fetch
> > > > > > >> > > >> offset"?
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > > > I think the comment is correct, though the phrasing may
> be
> > > > > > >> confusing.
> > > > > > >> > We
> > > > > > >> > > > know truncation has occurred if there exists a larger
> > epoch
> > > > > with a
> > > > > > >> > > starting
> > > > > > >> > > > offset that is lower than the fetch offset. Let me try
> to
> > > > > rephrase
> > > > > > >> > this.
> > > > > > >> > > >
> > > > > > >> > > > Thanks,
> > > > > > >> > > > Jason
> > > > > > >> > > >
> > > > > > >> > > > On Wed, Jun 27, 2018 at 9:17 AM, Guozhang Wang <
> > > > > > wangg...@gmail.com>
> > > > > > >> > > wrote:
> > > > > > >> > > >
> > > > > > >> > > >> Jason, thanks for the KIP. A few comments:
> > > > > > >> > > >>
> > > > > > >> > > >> * I think Dong's question about whether to use
> > > > timestamp-based
> > > > > > >> > approach
> > > > > > >> > > >> v.s. start-offset-of-first-larger-epoch is valid; more
> > > > > > >> specifically,
> > > > > > >> > > with
> > > > > > >> > > >> timestamp-based approach we may still be reseting to an
> > > > offset
> > > > > > >> falling
> > > > > > >> > > >> into
> > > > > > >> > > >> the truncated interval, and hence we may still miss
> some
> > > > data,
> > > > > > i.e.
> > > > > > >> > not
> > > > > > >> > > >> guaranteeing at-least-once still. With the
> > > > > > >> > > >> start-offset-of-first-larger-epoch, I'm not sure if it
> > > will
> > > > > > >> guarantee
> > > > > > >> > > no
> > > > > > >> > > >> valid data is missed when we have consecutive log
> > > truncations
> > > > > > >> (maybe
> > > > > > >> > we
> > > > > > >> > > >> need to look back into details of KIP-101 to figure it
> > > out).
> > > > If
> > > > > > the
> > > > > > >> > > latter
> > > > > > >> > > >> can indeed guarantee at least once, we could consider
> > using
> > > > > that
> > > > > > >> > > approach.
> > > > > > >> > > >>
> > > > > > >> > > >> * My current understanding is that, with unclean leader
> > > > > election
> > > > > > >> > turned
> > > > > > >> > > >> on,
> > > > > > >> > > >> exactly-once is out of the window since we cannot
> > guarantee
> > > > > that
> > > > > > >> all
> > > > > > >> > > >> committed message markers will not be lost. And hence
> > there
> > > > is
> > > > > no
> > > > > > >> need
> > > > > > >> > > to
> > > > > > >> > > >> have special handling logic for LOG_TRUNCATED or OOR
> > error
> > > > > codes
> > > > > > >> with
> > > > > > >> > > >> read.committed turned on. Is that right?
> > > > > > >> > > >>
> > > > > > >> > > >> * MINOR: "if the epoch is greater than the minimum
> > expected
> > > > > > epoch,
> > > > > > >> > that
> > > > > > >> > > >> the
> > > > > > >> > > >> new epoch does not begin at an earlier offset than the
> > > fetch
> > > > > > >> offset.
> > > > > > >> > In
> > > > > > >> > > >> the latter case, the leader can respond with a new
> > > > > LOG_TRUNCATION
> > > > > > >> > error
> > > > > > >> > > >> code" should it be "does not begin at a later offset
> than
> > > the
> > > > > > fetch
> > > > > > >> > > >> offset"?
> > > > > > >> > > >>
> > > > > > >> > > >>
> > > > > > >> > > >>
> > > > > > >> > > >> Guozhang
> > > > > > >> > > >>
> > > > > > >> > > >>
> > > > > > >> > > >> On Tue, Jun 26, 2018 at 6:51 PM, Dong Lin <
> > > > lindon...@gmail.com
> > > > > >
> > > > > > >> > wrote:
> > > > > > >> > > >>
> > > > > > >> > > >> > Hey Jason,
> > > > > > >> > > >> >
> > > > > > >> > > >> > Thanks for the explanation.
> > > > > > >> > > >> >
> > > > > > >> > > >> > Please correct me if this is wrong. The "unknown
> > > truncation
> > > > > > >> offset"
> > > > > > >> > > >> > scenario happens when consumer does not have the full
> > > > > > >> leaderEpoch ->
> > > > > > >> > > >> offset
> > > > > > >> > > >> > mapping. In this case we can still use the
> > KIP-101-based
> > > > > > >> approach to
> > > > > > >> > > >> > truncate offset to "start offset of the first Leader
> > > Epoch
> > > > > > larger
> > > > > > >> > than
> > > > > > >> > > >> last
> > > > > > >> > > >> > epoch of the consumer" but it may be inaccurate. So
> the
> > > KIP
> > > > > > >> chooses
> > > > > > >> > to
> > > > > > >> > > >> use
> > > > > > >> > > >> > the timestamp-based approach which is also
> best-effort.
> > > > > > >> > > >> >
> > > > > > >> > > >> > If this understanding is correct, for "closest"
> offset
> > > > reset
> > > > > > >> policy
> > > > > > >> > > and
> > > > > > >> > > >> > "unknown truncation offset" scenario, I am wondering
> > > > whether
> > > > > it
> > > > > > >> > maybe
> > > > > > >> > > >> > better to replace timestamp-based approach with
> KIP-101
> > > > based
> > > > > > >> > > approach.
> > > > > > >> > > >> In
> > > > > > >> > > >> > comparison to timestamp-based approach, the
> > KIP-101-based
> > > > > > >> approach
> > > > > > >> > > >> seems to
> > > > > > >> > > >> > simplify the API a bit since user does not need to
> > > > understand
> > > > > > >> > > timestamp.
> > > > > > >> > > >> > Similar to the timestamp-based approach, both
> > approaches
> > > > are
> > > > > > >> > > best-effort
> > > > > > >> > > >> > and do not guarantee that consumer can consume all
> > > > messages.
> > > > > It
> > > > > > >> is
> > > > > > >> > not
> > > > > > >> > > >> like
> > > > > > >> > > >> > KIP-279 which guarantees that follower broker can
> > consume
> > > > all
> > > > > > >> > messages
> > > > > > >> > > >> from
> > > > > > >> > > >> > the leader.
> > > > > > >> > > >> >
> > > > > > >> > > >> > Then it seems that the remaining difference is mostly
> > > about
> > > > > > >> > accuracy,
> > > > > > >> > > >> i.e.
> > > > > > >> > > >> > how much message will be duplicated or missed in the
> > > > "unknown
> > > > > > >> > > truncation
> > > > > > >> > > >> > offset" scenario. Not sure either one is clearly
> better
> > > > than
> > > > > > the
> > > > > > >> > > other.
> > > > > > >> > > >> > Note that there are two scenarios mentioned in
> KIP-279
> > > > which
> > > > > > are
> > > > > > >> not
> > > > > > >> > > >> > addressed by KIP-101. Both scenarios require quick
> > > > leadership
> > > > > > >> change
> > > > > > >> > > >> > between brokers, which seems to suggest that the
> offset
> > > > based
> > > > > > >> > obtained
> > > > > > >> > > >> > by "start
> > > > > > >> > > >> > offset of the first Leader Epoch larger than last
> epoch
> > > of
> > > > > the
> > > > > > >> > > consumer"
> > > > > > >> > > >> > under these two scenarios may be very close to the
> > offset
> > > > > > >> obtained
> > > > > > >> > by
> > > > > > >> > > >> the
> > > > > > >> > > >> > message timestamp. Does this sound reasonable?
> > > > > > >> > > >> >
> > > > > > >> > > >> > Good point that users on v1 format can get benefit
> with
> > > > > > timestamp
> > > > > > >> > > based
> > > > > > >> > > >> > approach. On the other hand it seems like a short
> term
> > > > > benefit
> > > > > > >> for
> > > > > > >> > > users
> > > > > > >> > > >> > who have not migrated. I am just not sure whether it
> is
> > > > more
> > > > > > >> > important
> > > > > > >> > > >> than
> > > > > > >> > > >> > designing a better API.
> > > > > > >> > > >> >
> > > > > > >> > > >> > Also, for both "latest" and "earliest" reset policy,
> do
> > > you
> > > > > > >> think it
> > > > > > >> > > >> would
> > > > > > >> > > >> > make sense to also use the KIP-101 based approach to
> > > > truncate
> > > > > > >> offset
> > > > > > >> > > for
> > > > > > >> > > >> > the "unknown truncation offset" scenario?
> > > > > > >> > > >> >
> > > > > > >> > > >> >
> > > > > > >> > > >> > Thanks,
> > > > > > >> > > >> > Dong
> > > > > > >> > > >> >
> > > > > > >> > > >>
> > > > > > >> > > >>
> > > > > > >> > > >>
> > > > > > >> > > >> --
> > > > > > >> > > >> -- Guozhang
> > > > > > >> > > >>
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > >
> > > > > > >> >
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > --
> > > > > > >> > -- Guozhang
> > > > > > >> >
> > > > > > >>
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to