>
> I am not sure I understand the benefit of incrementing this epoch after
> topic deletion. At a high level, client can not differentiate between topic
> deletion and topic creation when the global epoch is incremented. Can you
> provide more specific use-case?


Say you send two metadata requests to two separate brokers. In the
responses, one of them says a certain topic exists and one says it does
not. Who is right? My suggestion is to bump the topic epoch on deletion and
include it in the metadata response when returning
UNKNOWN_TOPIC_OR_PARTITION. Then the client always knows which metadata is
more current (if not necessarily up to date). Because of this ambiguity,
Kafka clients currently have no choice but retry on unknown topic errors.
Yes, you can say it is already handled, but this gives us some better
options in the future. In the consumer, users are often asking to be
notified when they attempt to fetch from unknown topics, for example,
because it could indicate a configuration problem. We have difficulty
supporting this at the moment.

Currently when broker returns UNKNOWN_TOPIC_OR_PARTITION, it means that the
> topic is not follower or leader of this partition. Note that
> UNKNOWN_TOPIC_OR_PARTITION does not necessarily tell client whether this
> partition exists on other broker or not. UNKNOWN_TOPIC_OR_PARTITION can be
> caused either when the broker has not yet processed the latest
> LeaderAndIsrRequest, or the client is using outdated metadata.


I don't think this is right. Metadata is propagated through the
UpdateMetadata request which the controller sends to all brokers. Brokers
will return UNKNOWN_TOPIC_OR_PARTITION in a metadata response if they don't
have metadata cached for the requested topic.

There is one problem though which I think might be what you're getting at.
After a topic is deleted, the controller will leave it out of future
UpdateMetadata requests, which means the deleted epoch would not be
propagated to all brokers and we'd be stuck in the current state. Suppose
instead that when a topic is deleted, we 1) bump the topic epoch, and 2)
set an expiration time (say several hours). When the expiration time is
reached, we delete the topic metadata in zookeeper; until then, the
controller continues to propagate it as usual with a flag indicating it no
longer exists. The point of the epoch is solving edge cases around topic
deletion and recreation, so the expiration timer gives clients a window to
observe the deletion before the metadata is removed. It also ensures that
topic metadata is eventually cleaned up following deletion.

What do you think?

In comparison to byte[], String has the benefit of being more readable and
> it is also the same type of the existing metadata field, which is used for
> a similar purpose by user. Do you think this is reasonable?


I don't have too strong of a feeling about it. I'm not sure how important
readability is since it's intended to be opaque to the user. To clarify a
little bit, I think we should continue to send the topic and leader epochs
in the protocol directly as separate fields. It's only when we surface it
through the consumer API that we add some obscurity since we don't want
users to depend on the fields directly and we don't want to make API
changes in the future if we need to add something else which is also
internal. In fact, rather than using byte[] or String directly, perhaps we
could just expose it as an object and give it a readable toString()?


Thanks,
Jason


On Fri, Jan 5, 2018 at 5:12 PM, Dong Lin <lindon...@gmail.com> wrote:

> Hey Jason,
>
> Thanks a lot for the comments. I will comment inline. And I have updated
> the KIP accordingly. Could you take another look?
>
> On Fri, Jan 5, 2018 at 11:15 AM, Jason Gustafson <ja...@confluent.io>
> wrote:
>
> > Hi Dong,
> >
> > Sorry for the late reply. I think the latest revision is looking good. I
> > have a few minor suggestions:
> >
> > 1. The name "partition_epoch" makes me think it changes independently at
> > the partition level, but all partitions for a topic should have the same
> > epoch. Maybe "topic_epoch" is nearer the mark?
> >
>
> Actually, in the current proposal, partitions of the same topic will have
> different epoch. Every time a new partition is created, either due to topic
> creation or partition expansion, the global epoch is incremented by 1 and
> is assigned to that partition. This is probably why we currently call it
> partition_epoch.
>
> Thinking about your idea more, one alternative approach following your idea
> is to use a topic_epoch is that incremented by 1 whenever we create a
> topic. We should store a single topic_epoch in
> znode /brokers/topics/[topic] without storing the list of partition_epoch
> for all partitions. This same epoch will be used for the new partitions
> after partition expansion of the existing topic. This approach has more
> simpler znode format than the existing KIP and it still allows us to detect
> topic created after topic deletion. I think this is better. I have updated
> the KIP with this approach.
>
>
> > 2. Should we increment this epoch when a topic is deleted also? When the
> > broker returns an UNKNOWN_TOPIC_OR_PARTITION error in a metadata
> response,
> > we can also include the latest partition epoch, which would allow the
> > client to disambiguate the error if it has seen more recent metadata.
> >
>
> I am not sure I understand the benefit of incrementing this epoch after
> topic deletion. At a high level, client can not differentiate between topic
> deletion and topic creation when the global epoch is incremented. Can you
> provide more specific use-case?
>
> Currently when broker returns UNKNOWN_TOPIC_OR_PARTITION, it means that the
> topic is not follower or leader of this partition. Note that
> UNKNOWN_TOPIC_OR_PARTITION does not necessarily tell client whether this
> partition exists on other broker or not. UNKNOWN_TOPIC_OR_PARTITION can be
> caused either when the broker has not yet processed the latest
> LeaderAndIsrRequest, or the client is using outdated metadata. In either
> case, the client needs to retry and possibly refresh metadata, which is
> already done by client with the current Kafka implementation. So it seems
> that we don't have a problem to fix here?
>
>
>
> > 3. I am still wondering whether it is a good idea to expose these epochs
> in
> > the consumer API. As an alternative, have you considered representing the
> > data as an opaque blob of bytes? For example:
> >
> > class OffsetAndMetadata {
> >   long offset;
> >   byte[] offsetMetadata;
> >   String metadata;
> > }
> >
> > Admittedly, the naming is a bit annoying, but we can probably come up
> with
> > something better. Internally the byte array would have a version. If in
> the
> > future we have anything else we need to add, we can update the version
> and
> > we wouldn't need any new APIs.
> >
> > The corresponding seek() and position() APIs might look something like
> > this:
> >
> > void seek(TopicPartition partition, long offset, byte[] offsetMetadata);
> > byte[] positionMetadata(TopicPartition partition);
> >
> > What do you think?
> >
>
> I think it is a very good idea to consolidate the new information into a
> single field rather than explicitly listing them with specific types. This
> provides us the advantage of evolving the information in this field in the
> future.
>
> But I probably would prefer to use String rather than byte[] as the type of
> this new field. This string can probably have the following json format:
>
> {
>   "version": 1,
>   "topic_epoch": int,
>   "leader_epoch": int.
> }
>
> In comparison to byte[], String has the benefit of being more readable and
> it is also the same type of the existing metadata field, which is used for
> a similar purpose by user. Do you think this is reasonable?
>
>
>
> >
> > Thanks,
> > Jason
> >
> > On Thu, Jan 4, 2018 at 7:04 PM, Dong Lin <lindon...@gmail.com> wrote:
> >
> > > Hey Jun, Jason,
> > >
> > > Thanks much for all the feedback. I have updated the KIP based on the
> > > latest discussion. Can you help check whether it looks good?
> > >
> > > Thanks,
> > > Dong
> > >
> > > On Thu, Jan 4, 2018 at 5:36 PM, Dong Lin <lindon...@gmail.com> wrote:
> > >
> > > > Hey Jun,
> > > >
> > > > Hmm... thinking about this more, I am not sure that the proposed API
> is
> > > > sufficient. For users that store offset externally, we probably need
> > > extra
> > > > API to return the leader_epoch and partition_epoch for all partitions
> > > that
> > > > consumers are consuming. I suppose these users currently use
> position()
> > > to
> > > > get the offset. Thus we probably need a new method
> > positionWithEpoch(..)
> > > to
> > > > return <offset, partition_epoch, leader_epoch>. Does this sound
> > > reasonable?
> > > >
> > > > Thanks,
> > > > Dong
> > > >
> > > >
> > > > On Thu, Jan 4, 2018 at 5:26 PM, Jun Rao <j...@confluent.io> wrote:
> > > >
> > > >> Hi, Dong,
> > > >>
> > > >> Yes, that's what I am thinking. OffsetEpoch will be composed of
> > > >> (partition_epoch,
> > > >> leader_epoch).
> > > >>
> > > >> Thanks,
> > > >>
> > > >> Jun
> > > >>
> > > >>
> > > >> On Thu, Jan 4, 2018 at 4:22 PM, Dong Lin <lindon...@gmail.com>
> wrote:
> > > >>
> > > >> > Hey Jun,
> > > >> >
> > > >> > Thanks much. I like the the new API that you proposed. I am not
> sure
> > > >> what
> > > >> > you exactly mean by offset_epoch. I suppose that we can use the
> pair
> > > of
> > > >> > (partition_epoch, leader_epoch) as the offset_epoch, right?
> > > >> >
> > > >> > Thanks,
> > > >> > Dong
> > > >> >
> > > >> > On Thu, Jan 4, 2018 at 4:02 PM, Jun Rao <j...@confluent.io> wrote:
> > > >> >
> > > >> > > Hi, Dong,
> > > >> > >
> > > >> > > Got it. The api that you proposed works. The question is whether
> > > >> that's
> > > >> > the
> > > >> > > api that we want to have in the long term. My concern is that
> > while
> > > >> the
> > > >> > api
> > > >> > > change is simple, the new api seems harder to explain and use.
> For
> > > >> > example,
> > > >> > > a consumer storing offsets externally now needs to call
> > > >> > > waitForMetadataUpdate() after calling seek().
> > > >> > >
> > > >> > > An alternative approach is to make the following compatible api
> > > >> changes
> > > >> > in
> > > >> > > Consumer.
> > > >> > > * Add an additional OffsetEpoch field in OffsetAndMetadata. (no
> > need
> > > >> to
> > > >> > > change the CommitSync() api)
> > > >> > > * Add a new api seek(TopicPartition partition, long offset,
> > > >> OffsetEpoch
> > > >> > > offsetEpoch). We can potentially deprecate the old api
> > > >> > seek(TopicPartition
> > > >> > > partition, long offset) in the future.
> > > >> > >
> > > >> > > The alternative approach has similar amount of api changes as
> > yours
> > > >> but
> > > >> > has
> > > >> > > the following benefits.
> > > >> > > 1. The api works in a similar way as how offset management works
> > now
> > > >> and
> > > >> > is
> > > >> > > probably what we want in the long term.
> > > >> > > 2. It can reset offsets better when there is data loss due to
> > > unclean
> > > >> > > leader election or correlated replica failure.
> > > >> > > 3. It can reset offsets better when topic is recreated.
> > > >> > >
> > > >> > > Thanks,
> > > >> > >
> > > >> > > Jun
> > > >> > >
> > > >> > >
> > > >> > > On Thu, Jan 4, 2018 at 2:05 PM, Dong Lin <lindon...@gmail.com>
> > > wrote:
> > > >> > >
> > > >> > > > Hey Jun,
> > > >> > > >
> > > >> > > > Yeah I agree that ideally we don't want an ever growing global
> > > >> metadata
> > > >> > > > version. I just think it may be more desirable to keep the
> > > consumer
> > > >> API
> > > >> > > > simple.
> > > >> > > >
> > > >> > > > In my current proposal, metadata version returned in the fetch
> > > >> response
> > > >> > > > will be stored with the offset together. More specifically,
> the
> > > >> > > > metadata_epoch in the new offset topic schema will be the
> > largest
> > > >> > > > metadata_epoch from all the MetadataResponse and FetchResponse
> > > ever
> > > >> > > > received by this consumer.
> > > >> > > >
> > > >> > > > We probably don't have to change the consumer API for
> > > >> > > > commitSync(Map<TopicPartition, OffsetAndMetadata>). If user
> > calls
> > > >> > > > commitSync(...) to commit offset 10 for a given partition, for
> > > most
> > > >> > > > use-cases, this consumer instance should have consumed message
> > > with
> > > >> > > offset
> > > >> > > > 9 from this partition, in which case the consumer can remember
> > and
> > > >> use
> > > >> > > the
> > > >> > > > metadata_epoch from the corresponding FetchResponse when
> > > committing
> > > >> > > offset.
> > > >> > > > If user calls commitSync(..) to commit offset 10 for a given
> > > >> partition
> > > >> > > > without having consumed the message with offset 9 using this
> > > >> consumer
> > > >> > > > instance, this is probably an advanced use-case. In this case
> > the
> > > >> > > advanced
> > > >> > > > user can retrieve the metadata_epoch using the newly added
> > > >> > > metadataEpoch()
> > > >> > > > API after it fetches the message with offset 9 (probably from
> > > >> another
> > > >> > > > consumer instance) and encode this metadata_epoch in the
> > > >> > > > string OffsetAndMetadata.metadata. Do you think this solution
> > > would
> > > >> > work?
> > > >> > > >
> > > >> > > > By "not sure that I fully understand your latest suggestion",
> > are
> > > >> you
> > > >> > > > referring to solution related to unclean leader election using
> > > >> > > leader_epoch
> > > >> > > > in my previous email?
> > > >> > > >
> > > >> > > > Thanks,
> > > >> > > > Dong
> > > >> > > >
> > > >> > > > On Thu, Jan 4, 2018 at 1:33 PM, Jun Rao <j...@confluent.io>
> > wrote:
> > > >> > > >
> > > >> > > > > Hi, Dong,
> > > >> > > > >
> > > >> > > > > Not sure that I fully understand your latest suggestion.
> > > >> Returning an
> > > >> > > > ever
> > > >> > > > > growing global metadata version itself is no ideal, but is
> > fine.
> > > >> My
> > > >> > > > > question is whether the metadata version returned in the
> fetch
> > > >> > response
> > > >> > > > > needs to be stored with the offset together if offsets are
> > > stored
> > > >> > > > > externally. If so, we also have to change the consumer API
> for
> > > >> > > > commitSync()
> > > >> > > > > and need to worry about compatibility. If we don't store the
> > > >> metadata
> > > >> > > > > version together with the offset, on a consumer restart,
> it's
> > > not
> > > >> > clear
> > > >> > > > how
> > > >> > > > > we can ensure the metadata in the consumer is high enough
> > since
> > > >> there
> > > >> > > is
> > > >> > > > no
> > > >> > > > > metadata version to compare with.
> > > >> > > > >
> > > >> > > > > Thanks,
> > > >> > > > >
> > > >> > > > > Jun
> > > >> > > > >
> > > >> > > > >
> > > >> > > > > On Wed, Jan 3, 2018 at 6:43 PM, Dong Lin <
> lindon...@gmail.com
> > >
> > > >> > wrote:
> > > >> > > > >
> > > >> > > > > > Hey Jun,
> > > >> > > > > >
> > > >> > > > > > Thanks much for the explanation.
> > > >> > > > > >
> > > >> > > > > > I understand the advantage of partition_epoch over
> > > >> metadata_epoch.
> > > >> > My
> > > >> > > > > > current concern is that the use of leader_epoch and the
> > > >> > > partition_epoch
> > > >> > > > > > requires us considerable change to consumer's public API
> to
> > > take
> > > >> > care
> > > >> > > > of
> > > >> > > > > > the case where user stores offset externally. For example,
> > > >> > > *consumer*.
> > > >> > > > > > *commitSync*(..) would have to take a map whose value is
> > > >> <offset,
> > > >> > > > > metadata,
> > > >> > > > > > leader epoch, partition epoch>. *consumer*.*seek*(...)
> would
> > > >> also
> > > >> > > need
> > > >> > > > > > leader_epoch and partition_epoch as parameter. Technically
> > we
> > > >> can
> > > >> > > > > probably
> > > >> > > > > > still make it work in a backward compatible manner after
> > > careful
> > > >> > > design
> > > >> > > > > and
> > > >> > > > > > discussion. But these changes can make the consumer's
> > > interface
> > > >> > > > > > unnecessarily complex for more users who do not store
> offset
> > > >> > > > externally.
> > > >> > > > > >
> > > >> > > > > > After thinking more about it, we can address all problems
> > > >> discussed
> > > >> > > by
> > > >> > > > > only
> > > >> > > > > > using the metadata_epoch without introducing leader_epoch
> or
> > > the
> > > >> > > > > > partition_epoch. The current KIP describes the changes to
> > the
> > > >> > > consumer
> > > >> > > > > API
> > > >> > > > > > and how the new API can be used if user stores offset
> > > >> externally.
> > > >> > In
> > > >> > > > > order
> > > >> > > > > > to address the scenario you described earlier, we can
> > include
> > > >> > > > > > metadata_epoch in the FetchResponse and the
> > > LeaderAndIsrRequest.
> > > >> > > > Consumer
> > > >> > > > > > remembers the largest metadata_epoch from all the
> > > FetchResponse
> > > >> it
> > > >> > > has
> > > >> > > > > > received. The metadata_epoch committed with the offset,
> > either
> > > >> > within
> > > >> > > > or
> > > >> > > > > > outside Kafka, should be the largest metadata_epoch across
> > all
> > > >> > > > > > FetchResponse and MetadataResponse ever received by this
> > > >> consumer.
> > > >> > > > > >
> > > >> > > > > > The drawback of using only the metadata_epoch is that we
> can
> > > not
> > > >> > > always
> > > >> > > > > do
> > > >> > > > > > the smart offset reset in case of unclean leader election
> > > which
> > > >> you
> > > >> > > > > > mentioned earlier. But in most case, unclean leader
> election
> > > >> > probably
> > > >> > > > > > happens when consumer is not rebalancing/restarting. In
> > these
> > > >> > cases,
> > > >> > > > > either
> > > >> > > > > > consumer is not directly affected by unclean leader
> election
> > > >> since
> > > >> > it
> > > >> > > > is
> > > >> > > > > > not consuming from the end of the log, or consumer can
> > derive
> > > >> the
> > > >> > > > > > leader_epoch from the most recent message received before
> it
> > > >> sees
> > > >> > > > > > OffsetOutOfRangeException. So I am not sure it is worth
> > adding
> > > >> the
> > > >> > > > > > leader_epoch to consumer API to address the remaining
> corner
> > > >> case.
> > > >> > > What
> > > >> > > > > do
> > > >> > > > > > you think?
> > > >> > > > > >
> > > >> > > > > > Thanks,
> > > >> > > > > > Dong
> > > >> > > > > >
> > > >> > > > > >
> > > >> > > > > >
> > > >> > > > > > On Tue, Jan 2, 2018 at 6:28 PM, Jun Rao <j...@confluent.io
> >
> > > >> wrote:
> > > >> > > > > >
> > > >> > > > > > > Hi, Dong,
> > > >> > > > > > >
> > > >> > > > > > > Thanks for the reply.
> > > >> > > > > > >
> > > >> > > > > > > To solve the topic recreation issue, we could use
> either a
> > > >> global
> > > >> > > > > > metadata
> > > >> > > > > > > version or a partition level epoch. But either one will
> > be a
> > > >> new
> > > >> > > > > concept,
> > > >> > > > > > > right? To me, the latter seems more natural. It also
> makes
> > > it
> > > >> > > easier
> > > >> > > > to
> > > >> > > > > > > detect if a consumer's offset is still valid after a
> topic
> > > is
> > > >> > > > > recreated.
> > > >> > > > > > As
> > > >> > > > > > > you pointed out, we don't need to store the partition
> > epoch
> > > in
> > > >> > the
> > > >> > > > > > message.
> > > >> > > > > > > The following is what I am thinking. When a partition is
> > > >> created,
> > > >> > > we
> > > >> > > > > can
> > > >> > > > > > > assign a partition epoch from an ever-increasing global
> > > >> counter
> > > >> > and
> > > >> > > > > store
> > > >> > > > > > > it in /brokers/topics/[topic]/partitions/[partitionId]
> in
> > > ZK.
> > > >> > The
> > > >> > > > > > > partition
> > > >> > > > > > > epoch is propagated to every broker. The consumer will
> be
> > > >> > tracking
> > > >> > > a
> > > >> > > > > > tuple
> > > >> > > > > > > of <offset, leader epoch, partition epoch> for offsets.
> > If a
> > > >> > topic
> > > >> > > is
> > > >> > > > > > > recreated, it's possible that a consumer's offset and
> > leader
> > > >> > epoch
> > > >> > > > > still
> > > >> > > > > > > match that in the broker, but partition epoch won't be.
> In
> > > >> this
> > > >> > > case,
> > > >> > > > > we
> > > >> > > > > > > can potentially still treat the consumer's offset as out
> > of
> > > >> range
> > > >> > > and
> > > >> > > > > > reset
> > > >> > > > > > > the offset based on the offset reset policy in the
> > consumer.
> > > >> This
> > > >> > > > seems
> > > >> > > > > > > harder to do with a global metadata version.
> > > >> > > > > > >
> > > >> > > > > > > Jun
> > > >> > > > > > >
> > > >> > > > > > >
> > > >> > > > > > >
> > > >> > > > > > > On Mon, Dec 25, 2017 at 6:56 AM, Dong Lin <
> > > >> lindon...@gmail.com>
> > > >> > > > wrote:
> > > >> > > > > > >
> > > >> > > > > > > > Hey Jun,
> > > >> > > > > > > >
> > > >> > > > > > > > This is a very good example. After thinking through
> this
> > > in
> > > >> > > > detail, I
> > > >> > > > > > > agree
> > > >> > > > > > > > that we need to commit offset with leader epoch in
> order
> > > to
> > > >> > > address
> > > >> > > > > > this
> > > >> > > > > > > > example.
> > > >> > > > > > > >
> > > >> > > > > > > > I think the remaining question is how to address the
> > > >> scenario
> > > >> > > that
> > > >> > > > > the
> > > >> > > > > > > > topic is deleted and re-created. One possible solution
> > is
> > > to
> > > >> > > commit
> > > >> > > > > > > offset
> > > >> > > > > > > > with both the leader epoch and the metadata version.
> The
> > > >> logic
> > > >> > > and
> > > >> > > > > the
> > > >> > > > > > > > implementation of this solution does not require a new
> > > >> concept
> > > >> > > > (e.g.
> > > >> > > > > > > > partition epoch) and it does not require any change to
> > the
> > > >> > > message
> > > >> > > > > > format
> > > >> > > > > > > > or leader epoch. It also allows us to order the
> metadata
> > > in
> > > >> a
> > > >> > > > > > > > straightforward manner which may be useful in the
> > future.
> > > >> So it
> > > >> > > may
> > > >> > > > > be
> > > >> > > > > > a
> > > >> > > > > > > > better solution than generating a random partition
> epoch
> > > >> every
> > > >> > > time
> > > >> > > > > we
> > > >> > > > > > > > create a partition. Does this sound reasonable?
> > > >> > > > > > > >
> > > >> > > > > > > > Previously one concern with using the metadata version
> > is
> > > >> that
> > > >> > > > > consumer
> > > >> > > > > > > > will be forced to refresh metadata even if metadata
> > > version
> > > >> is
> > > >> > > > > > increased
> > > >> > > > > > > > due to topics that the consumer is not interested in.
> > Now
> > > I
> > > >> > > > realized
> > > >> > > > > > that
> > > >> > > > > > > > this is probably not a problem. Currently client will
> > > >> refresh
> > > >> > > > > metadata
> > > >> > > > > > > > either due to InvalidMetadataException in the response
> > > from
> > > >> > > broker
> > > >> > > > or
> > > >> > > > > > due
> > > >> > > > > > > > to metadata expiry. The addition of the metadata
> version
> > > >> should
> > > >> > > > > > increase
> > > >> > > > > > > > the overhead of metadata refresh caused by
> > > >> > > > InvalidMetadataException.
> > > >> > > > > If
> > > >> > > > > > > > client refresh metadata due to expiry and it receives
> a
> > > >> > metadata
> > > >> > > > > whose
> > > >> > > > > > > > version is lower than the current metadata version, we
> > can
> > > >> > reject
> > > >> > > > the
> > > >> > > > > > > > metadata but still reset the metadata age, which
> > > essentially
> > > >> > keep
> > > >> > > > the
> > > >> > > > > > > > existing behavior in the client.
> > > >> > > > > > > >
> > > >> > > > > > > > Thanks much,
> > > >> > > > > > > > Dong
> > > >> > > > > > > >
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > > >
> > > >
> > >
> >
>

Reply via email to