Hey Jason,

Thanks for the comments.

On Mon, Jan 8, 2018 at 2:27 PM, Jason Gustafson <ja...@confluent.io> wrote:

> >
> > I am not sure I understand the benefit of incrementing this epoch after
> > topic deletion. At a high level, client can not differentiate between
> topic
> > deletion and topic creation when the global epoch is incremented. Can you
> > provide more specific use-case?
>
>
> Say you send two metadata requests to two separate brokers. In the
> responses, one of them says a certain topic exists and one says it does
> not. Who is right? My suggestion is to bump the topic epoch on deletion and
> include it in the metadata response when returning
> UNKNOWN_TOPIC_OR_PARTITION. Then the client always knows which metadata is
> more current (if not necessarily up to date). Because of this ambiguity,
> Kafka clients currently have no choice but retry on unknown topic errors.
> Yes, you can say it is already handled, but this gives us some better
> options in the future. In the consumer, users are often asking to be
> notified when they attempt to fetch from unknown topics, for example,
> because it could indicate a configuration problem. We have difficulty
> supporting this at the moment.
>
> Currently when broker returns UNKNOWN_TOPIC_OR_PARTITION, it means that the
> > topic is not follower or leader of this partition. Note that
> > UNKNOWN_TOPIC_OR_PARTITION does not necessarily tell client whether this
> > partition exists on other broker or not. UNKNOWN_TOPIC_OR_PARTITION can
> be
> > caused either when the broker has not yet processed the latest
> > LeaderAndIsrRequest, or the client is using outdated metadata.
>

My bad. I was thinking about the UNKNOWN_TOPIC_OR_PARTITION from
ProduceResponse or FetchReponse but we are actually discussing the error
from MetadataResponse.



>
>
> I don't think this is right. Metadata is propagated through the
> UpdateMetadata request which the controller sends to all brokers. Brokers
> will return UNKNOWN_TOPIC_OR_PARTITION in a metadata response if they don't
> have metadata cached for the requested topic.
>
> There is one problem though which I think might be what you're getting at.
> After a topic is deleted, the controller will leave it out of future
> UpdateMetadata requests, which means the deleted epoch would not be
> propagated to all brokers and we'd be stuck in the current state. Suppose
> instead that when a topic is deleted, we 1) bump the topic epoch, and 2)
> set an expiration time (say several hours). When the expiration time is
> reached, we delete the topic metadata in zookeeper; until then, the
> controller continues to propagate it as usual with a flag indicating it no
> longer exists. The point of the epoch is solving edge cases around topic
> deletion and recreation, so the expiration timer gives clients a window to
> observe the deletion before the metadata is removed. It also ensures that
> topic metadata is eventually cleaned up following deletion.
>
> What do you think?
>

Yeah that is what I am thinking. As of current proposal, when a topic is
deleted, all topic-specific information will be immediately removed and
client will not see this topic in the latest MetadataResponse. This makes
it hard to distinguish between the metadata before and after the topic
deletion even if we bump up the global topic_epoch in the zookeeper.

I think your approach will allow user to distinguish between the metadata
before and after the topic deletion. I also agree that this can be
potentially be useful to user. I am just not very sure whether we already
have a good use-case to make the additional complexity worthwhile. It seems
that this feature is kind of independent of the main problem of this KIP.
Could we add this as a future work?



> In comparison to byte[], String has the benefit of being more readable and
> > it is also the same type of the existing metadata field, which is used
> for
> > a similar purpose by user. Do you think this is reasonable?
>
>
> I don't have too strong of a feeling about it. I'm not sure how important
> readability is since it's intended to be opaque to the user. To clarify a
> little bit, I think we should continue to send the topic and leader epochs
> in the protocol directly as separate fields. It's only when we surface it
> through the consumer API that we add some obscurity since we don't want
> users to depend on the fields directly and we don't want to make API
> changes in the future if we need to add something else which is also
> internal. In fact, rather than using byte[] or String directly, perhaps we
> could just expose it as an object and give it a readable toString()?
>

I am also not very strong about this. String just seems like a nature
choice given that we already have similar json formatted strong in
zookeeper and we use String for the existing metadata in OffsetAndMetadata.
I just feel it is probably better to use some Java class than using raw
byte array in our Consumer API. But I don't have a strong reason.

If we expose it as an object, I am not very sure how user is going to store
and retrieve this value from an external store. If user stores the
object.toString() in the external store, how can user convert the string
back to the object?

And yes, this only matters to the consumer API. The current KIP continues
to send leader_epoch and topic_epoch as separate fields in request/response
and the offset topic schema.

Thanks much,
Dong



>
> Thanks,
> Jason
>
>
> On Fri, Jan 5, 2018 at 5:12 PM, Dong Lin <lindon...@gmail.com> wrote:
>
> > Hey Jason,
> >
> > Thanks a lot for the comments. I will comment inline. And I have updated
> > the KIP accordingly. Could you take another look?
> >
> > On Fri, Jan 5, 2018 at 11:15 AM, Jason Gustafson <ja...@confluent.io>
> > wrote:
> >
> > > Hi Dong,
> > >
> > > Sorry for the late reply. I think the latest revision is looking good.
> I
> > > have a few minor suggestions:
> > >
> > > 1. The name "partition_epoch" makes me think it changes independently
> at
> > > the partition level, but all partitions for a topic should have the
> same
> > > epoch. Maybe "topic_epoch" is nearer the mark?
> > >
> >
> > Actually, in the current proposal, partitions of the same topic will have
> > different epoch. Every time a new partition is created, either due to
> topic
> > creation or partition expansion, the global epoch is incremented by 1 and
> > is assigned to that partition. This is probably why we currently call it
> > partition_epoch.
> >
> > Thinking about your idea more, one alternative approach following your
> idea
> > is to use a topic_epoch is that incremented by 1 whenever we create a
> > topic. We should store a single topic_epoch in
> > znode /brokers/topics/[topic] without storing the list of partition_epoch
> > for all partitions. This same epoch will be used for the new partitions
> > after partition expansion of the existing topic. This approach has more
> > simpler znode format than the existing KIP and it still allows us to
> detect
> > topic created after topic deletion. I think this is better. I have
> updated
> > the KIP with this approach.
> >
> >
> > > 2. Should we increment this epoch when a topic is deleted also? When
> the
> > > broker returns an UNKNOWN_TOPIC_OR_PARTITION error in a metadata
> > response,
> > > we can also include the latest partition epoch, which would allow the
> > > client to disambiguate the error if it has seen more recent metadata.
> > >
> >
> > I am not sure I understand the benefit of incrementing this epoch after
> > topic deletion. At a high level, client can not differentiate between
> topic
> > deletion and topic creation when the global epoch is incremented. Can you
> > provide more specific use-case?
> >
> > Currently when broker returns UNKNOWN_TOPIC_OR_PARTITION, it means that
> the
> > topic is not follower or leader of this partition. Note that
> > UNKNOWN_TOPIC_OR_PARTITION does not necessarily tell client whether this
> > partition exists on other broker or not. UNKNOWN_TOPIC_OR_PARTITION can
> be
> > caused either when the broker has not yet processed the latest
> > LeaderAndIsrRequest, or the client is using outdated metadata. In either
> > case, the client needs to retry and possibly refresh metadata, which is
> > already done by client with the current Kafka implementation. So it seems
> > that we don't have a problem to fix here?
> >
> >
> >
> > > 3. I am still wondering whether it is a good idea to expose these
> epochs
> > in
> > > the consumer API. As an alternative, have you considered representing
> the
> > > data as an opaque blob of bytes? For example:
> > >
> > > class OffsetAndMetadata {
> > >   long offset;
> > >   byte[] offsetMetadata;
> > >   String metadata;
> > > }
> > >
> > > Admittedly, the naming is a bit annoying, but we can probably come up
> > with
> > > something better. Internally the byte array would have a version. If in
> > the
> > > future we have anything else we need to add, we can update the version
> > and
> > > we wouldn't need any new APIs.
> > >
> > > The corresponding seek() and position() APIs might look something like
> > > this:
> > >
> > > void seek(TopicPartition partition, long offset, byte[]
> offsetMetadata);
> > > byte[] positionMetadata(TopicPartition partition);
> > >
> > > What do you think?
> > >
> >
> > I think it is a very good idea to consolidate the new information into a
> > single field rather than explicitly listing them with specific types.
> This
> > provides us the advantage of evolving the information in this field in
> the
> > future.
> >
> > But I probably would prefer to use String rather than byte[] as the type
> of
> > this new field. This string can probably have the following json format:
> >
> > {
> >   "version": 1,
> >   "topic_epoch": int,
> >   "leader_epoch": int.
> > }
> >
> > In comparison to byte[], String has the benefit of being more readable
> and
> > it is also the same type of the existing metadata field, which is used
> for
> > a similar purpose by user. Do you think this is reasonable?
> >
> >
> >
> > >
> > > Thanks,
> > > Jason
> > >
> > > On Thu, Jan 4, 2018 at 7:04 PM, Dong Lin <lindon...@gmail.com> wrote:
> > >
> > > > Hey Jun, Jason,
> > > >
> > > > Thanks much for all the feedback. I have updated the KIP based on the
> > > > latest discussion. Can you help check whether it looks good?
> > > >
> > > > Thanks,
> > > > Dong
> > > >
> > > > On Thu, Jan 4, 2018 at 5:36 PM, Dong Lin <lindon...@gmail.com>
> wrote:
> > > >
> > > > > Hey Jun,
> > > > >
> > > > > Hmm... thinking about this more, I am not sure that the proposed
> API
> > is
> > > > > sufficient. For users that store offset externally, we probably
> need
> > > > extra
> > > > > API to return the leader_epoch and partition_epoch for all
> partitions
> > > > that
> > > > > consumers are consuming. I suppose these users currently use
> > position()
> > > > to
> > > > > get the offset. Thus we probably need a new method
> > > positionWithEpoch(..)
> > > > to
> > > > > return <offset, partition_epoch, leader_epoch>. Does this sound
> > > > reasonable?
> > > > >
> > > > > Thanks,
> > > > > Dong
> > > > >
> > > > >
> > > > > On Thu, Jan 4, 2018 at 5:26 PM, Jun Rao <j...@confluent.io> wrote:
> > > > >
> > > > >> Hi, Dong,
> > > > >>
> > > > >> Yes, that's what I am thinking. OffsetEpoch will be composed of
> > > > >> (partition_epoch,
> > > > >> leader_epoch).
> > > > >>
> > > > >> Thanks,
> > > > >>
> > > > >> Jun
> > > > >>
> > > > >>
> > > > >> On Thu, Jan 4, 2018 at 4:22 PM, Dong Lin <lindon...@gmail.com>
> > wrote:
> > > > >>
> > > > >> > Hey Jun,
> > > > >> >
> > > > >> > Thanks much. I like the the new API that you proposed. I am not
> > sure
> > > > >> what
> > > > >> > you exactly mean by offset_epoch. I suppose that we can use the
> > pair
> > > > of
> > > > >> > (partition_epoch, leader_epoch) as the offset_epoch, right?
> > > > >> >
> > > > >> > Thanks,
> > > > >> > Dong
> > > > >> >
> > > > >> > On Thu, Jan 4, 2018 at 4:02 PM, Jun Rao <j...@confluent.io>
> wrote:
> > > > >> >
> > > > >> > > Hi, Dong,
> > > > >> > >
> > > > >> > > Got it. The api that you proposed works. The question is
> whether
> > > > >> that's
> > > > >> > the
> > > > >> > > api that we want to have in the long term. My concern is that
> > > while
> > > > >> the
> > > > >> > api
> > > > >> > > change is simple, the new api seems harder to explain and use.
> > For
> > > > >> > example,
> > > > >> > > a consumer storing offsets externally now needs to call
> > > > >> > > waitForMetadataUpdate() after calling seek().
> > > > >> > >
> > > > >> > > An alternative approach is to make the following compatible
> api
> > > > >> changes
> > > > >> > in
> > > > >> > > Consumer.
> > > > >> > > * Add an additional OffsetEpoch field in OffsetAndMetadata.
> (no
> > > need
> > > > >> to
> > > > >> > > change the CommitSync() api)
> > > > >> > > * Add a new api seek(TopicPartition partition, long offset,
> > > > >> OffsetEpoch
> > > > >> > > offsetEpoch). We can potentially deprecate the old api
> > > > >> > seek(TopicPartition
> > > > >> > > partition, long offset) in the future.
> > > > >> > >
> > > > >> > > The alternative approach has similar amount of api changes as
> > > yours
> > > > >> but
> > > > >> > has
> > > > >> > > the following benefits.
> > > > >> > > 1. The api works in a similar way as how offset management
> works
> > > now
> > > > >> and
> > > > >> > is
> > > > >> > > probably what we want in the long term.
> > > > >> > > 2. It can reset offsets better when there is data loss due to
> > > > unclean
> > > > >> > > leader election or correlated replica failure.
> > > > >> > > 3. It can reset offsets better when topic is recreated.
> > > > >> > >
> > > > >> > > Thanks,
> > > > >> > >
> > > > >> > > Jun
> > > > >> > >
> > > > >> > >
> > > > >> > > On Thu, Jan 4, 2018 at 2:05 PM, Dong Lin <lindon...@gmail.com
> >
> > > > wrote:
> > > > >> > >
> > > > >> > > > Hey Jun,
> > > > >> > > >
> > > > >> > > > Yeah I agree that ideally we don't want an ever growing
> global
> > > > >> metadata
> > > > >> > > > version. I just think it may be more desirable to keep the
> > > > consumer
> > > > >> API
> > > > >> > > > simple.
> > > > >> > > >
> > > > >> > > > In my current proposal, metadata version returned in the
> fetch
> > > > >> response
> > > > >> > > > will be stored with the offset together. More specifically,
> > the
> > > > >> > > > metadata_epoch in the new offset topic schema will be the
> > > largest
> > > > >> > > > metadata_epoch from all the MetadataResponse and
> FetchResponse
> > > > ever
> > > > >> > > > received by this consumer.
> > > > >> > > >
> > > > >> > > > We probably don't have to change the consumer API for
> > > > >> > > > commitSync(Map<TopicPartition, OffsetAndMetadata>). If user
> > > calls
> > > > >> > > > commitSync(...) to commit offset 10 for a given partition,
> for
> > > > most
> > > > >> > > > use-cases, this consumer instance should have consumed
> message
> > > > with
> > > > >> > > offset
> > > > >> > > > 9 from this partition, in which case the consumer can
> remember
> > > and
> > > > >> use
> > > > >> > > the
> > > > >> > > > metadata_epoch from the corresponding FetchResponse when
> > > > committing
> > > > >> > > offset.
> > > > >> > > > If user calls commitSync(..) to commit offset 10 for a given
> > > > >> partition
> > > > >> > > > without having consumed the message with offset 9 using this
> > > > >> consumer
> > > > >> > > > instance, this is probably an advanced use-case. In this
> case
> > > the
> > > > >> > > advanced
> > > > >> > > > user can retrieve the metadata_epoch using the newly added
> > > > >> > > metadataEpoch()
> > > > >> > > > API after it fetches the message with offset 9 (probably
> from
> > > > >> another
> > > > >> > > > consumer instance) and encode this metadata_epoch in the
> > > > >> > > > string OffsetAndMetadata.metadata. Do you think this
> solution
> > > > would
> > > > >> > work?
> > > > >> > > >
> > > > >> > > > By "not sure that I fully understand your latest
> suggestion",
> > > are
> > > > >> you
> > > > >> > > > referring to solution related to unclean leader election
> using
> > > > >> > > leader_epoch
> > > > >> > > > in my previous email?
> > > > >> > > >
> > > > >> > > > Thanks,
> > > > >> > > > Dong
> > > > >> > > >
> > > > >> > > > On Thu, Jan 4, 2018 at 1:33 PM, Jun Rao <j...@confluent.io>
> > > wrote:
> > > > >> > > >
> > > > >> > > > > Hi, Dong,
> > > > >> > > > >
> > > > >> > > > > Not sure that I fully understand your latest suggestion.
> > > > >> Returning an
> > > > >> > > > ever
> > > > >> > > > > growing global metadata version itself is no ideal, but is
> > > fine.
> > > > >> My
> > > > >> > > > > question is whether the metadata version returned in the
> > fetch
> > > > >> > response
> > > > >> > > > > needs to be stored with the offset together if offsets are
> > > > stored
> > > > >> > > > > externally. If so, we also have to change the consumer API
> > for
> > > > >> > > > commitSync()
> > > > >> > > > > and need to worry about compatibility. If we don't store
> the
> > > > >> metadata
> > > > >> > > > > version together with the offset, on a consumer restart,
> > it's
> > > > not
> > > > >> > clear
> > > > >> > > > how
> > > > >> > > > > we can ensure the metadata in the consumer is high enough
> > > since
> > > > >> there
> > > > >> > > is
> > > > >> > > > no
> > > > >> > > > > metadata version to compare with.
> > > > >> > > > >
> > > > >> > > > > Thanks,
> > > > >> > > > >
> > > > >> > > > > Jun
> > > > >> > > > >
> > > > >> > > > >
> > > > >> > > > > On Wed, Jan 3, 2018 at 6:43 PM, Dong Lin <
> > lindon...@gmail.com
> > > >
> > > > >> > wrote:
> > > > >> > > > >
> > > > >> > > > > > Hey Jun,
> > > > >> > > > > >
> > > > >> > > > > > Thanks much for the explanation.
> > > > >> > > > > >
> > > > >> > > > > > I understand the advantage of partition_epoch over
> > > > >> metadata_epoch.
> > > > >> > My
> > > > >> > > > > > current concern is that the use of leader_epoch and the
> > > > >> > > partition_epoch
> > > > >> > > > > > requires us considerable change to consumer's public API
> > to
> > > > take
> > > > >> > care
> > > > >> > > > of
> > > > >> > > > > > the case where user stores offset externally. For
> example,
> > > > >> > > *consumer*.
> > > > >> > > > > > *commitSync*(..) would have to take a map whose value is
> > > > >> <offset,
> > > > >> > > > > metadata,
> > > > >> > > > > > leader epoch, partition epoch>. *consumer*.*seek*(...)
> > would
> > > > >> also
> > > > >> > > need
> > > > >> > > > > > leader_epoch and partition_epoch as parameter.
> Technically
> > > we
> > > > >> can
> > > > >> > > > > probably
> > > > >> > > > > > still make it work in a backward compatible manner after
> > > > careful
> > > > >> > > design
> > > > >> > > > > and
> > > > >> > > > > > discussion. But these changes can make the consumer's
> > > > interface
> > > > >> > > > > > unnecessarily complex for more users who do not store
> > offset
> > > > >> > > > externally.
> > > > >> > > > > >
> > > > >> > > > > > After thinking more about it, we can address all
> problems
> > > > >> discussed
> > > > >> > > by
> > > > >> > > > > only
> > > > >> > > > > > using the metadata_epoch without introducing
> leader_epoch
> > or
> > > > the
> > > > >> > > > > > partition_epoch. The current KIP describes the changes
> to
> > > the
> > > > >> > > consumer
> > > > >> > > > > API
> > > > >> > > > > > and how the new API can be used if user stores offset
> > > > >> externally.
> > > > >> > In
> > > > >> > > > > order
> > > > >> > > > > > to address the scenario you described earlier, we can
> > > include
> > > > >> > > > > > metadata_epoch in the FetchResponse and the
> > > > LeaderAndIsrRequest.
> > > > >> > > > Consumer
> > > > >> > > > > > remembers the largest metadata_epoch from all the
> > > > FetchResponse
> > > > >> it
> > > > >> > > has
> > > > >> > > > > > received. The metadata_epoch committed with the offset,
> > > either
> > > > >> > within
> > > > >> > > > or
> > > > >> > > > > > outside Kafka, should be the largest metadata_epoch
> across
> > > all
> > > > >> > > > > > FetchResponse and MetadataResponse ever received by this
> > > > >> consumer.
> > > > >> > > > > >
> > > > >> > > > > > The drawback of using only the metadata_epoch is that we
> > can
> > > > not
> > > > >> > > always
> > > > >> > > > > do
> > > > >> > > > > > the smart offset reset in case of unclean leader
> election
> > > > which
> > > > >> you
> > > > >> > > > > > mentioned earlier. But in most case, unclean leader
> > election
> > > > >> > probably
> > > > >> > > > > > happens when consumer is not rebalancing/restarting. In
> > > these
> > > > >> > cases,
> > > > >> > > > > either
> > > > >> > > > > > consumer is not directly affected by unclean leader
> > election
> > > > >> since
> > > > >> > it
> > > > >> > > > is
> > > > >> > > > > > not consuming from the end of the log, or consumer can
> > > derive
> > > > >> the
> > > > >> > > > > > leader_epoch from the most recent message received
> before
> > it
> > > > >> sees
> > > > >> > > > > > OffsetOutOfRangeException. So I am not sure it is worth
> > > adding
> > > > >> the
> > > > >> > > > > > leader_epoch to consumer API to address the remaining
> > corner
> > > > >> case.
> > > > >> > > What
> > > > >> > > > > do
> > > > >> > > > > > you think?
> > > > >> > > > > >
> > > > >> > > > > > Thanks,
> > > > >> > > > > > Dong
> > > > >> > > > > >
> > > > >> > > > > >
> > > > >> > > > > >
> > > > >> > > > > > On Tue, Jan 2, 2018 at 6:28 PM, Jun Rao <
> j...@confluent.io
> > >
> > > > >> wrote:
> > > > >> > > > > >
> > > > >> > > > > > > Hi, Dong,
> > > > >> > > > > > >
> > > > >> > > > > > > Thanks for the reply.
> > > > >> > > > > > >
> > > > >> > > > > > > To solve the topic recreation issue, we could use
> > either a
> > > > >> global
> > > > >> > > > > > metadata
> > > > >> > > > > > > version or a partition level epoch. But either one
> will
> > > be a
> > > > >> new
> > > > >> > > > > concept,
> > > > >> > > > > > > right? To me, the latter seems more natural. It also
> > makes
> > > > it
> > > > >> > > easier
> > > > >> > > > to
> > > > >> > > > > > > detect if a consumer's offset is still valid after a
> > topic
> > > > is
> > > > >> > > > > recreated.
> > > > >> > > > > > As
> > > > >> > > > > > > you pointed out, we don't need to store the partition
> > > epoch
> > > > in
> > > > >> > the
> > > > >> > > > > > message.
> > > > >> > > > > > > The following is what I am thinking. When a partition
> is
> > > > >> created,
> > > > >> > > we
> > > > >> > > > > can
> > > > >> > > > > > > assign a partition epoch from an ever-increasing
> global
> > > > >> counter
> > > > >> > and
> > > > >> > > > > store
> > > > >> > > > > > > it in /brokers/topics/[topic]/
> partitions/[partitionId]
> > in
> > > > ZK.
> > > > >> > The
> > > > >> > > > > > > partition
> > > > >> > > > > > > epoch is propagated to every broker. The consumer will
> > be
> > > > >> > tracking
> > > > >> > > a
> > > > >> > > > > > tuple
> > > > >> > > > > > > of <offset, leader epoch, partition epoch> for
> offsets.
> > > If a
> > > > >> > topic
> > > > >> > > is
> > > > >> > > > > > > recreated, it's possible that a consumer's offset and
> > > leader
> > > > >> > epoch
> > > > >> > > > > still
> > > > >> > > > > > > match that in the broker, but partition epoch won't
> be.
> > In
> > > > >> this
> > > > >> > > case,
> > > > >> > > > > we
> > > > >> > > > > > > can potentially still treat the consumer's offset as
> out
> > > of
> > > > >> range
> > > > >> > > and
> > > > >> > > > > > reset
> > > > >> > > > > > > the offset based on the offset reset policy in the
> > > consumer.
> > > > >> This
> > > > >> > > > seems
> > > > >> > > > > > > harder to do with a global metadata version.
> > > > >> > > > > > >
> > > > >> > > > > > > Jun
> > > > >> > > > > > >
> > > > >> > > > > > >
> > > > >> > > > > > >
> > > > >> > > > > > > On Mon, Dec 25, 2017 at 6:56 AM, Dong Lin <
> > > > >> lindon...@gmail.com>
> > > > >> > > > wrote:
> > > > >> > > > > > >
> > > > >> > > > > > > > Hey Jun,
> > > > >> > > > > > > >
> > > > >> > > > > > > > This is a very good example. After thinking through
> > this
> > > > in
> > > > >> > > > detail, I
> > > > >> > > > > > > agree
> > > > >> > > > > > > > that we need to commit offset with leader epoch in
> > order
> > > > to
> > > > >> > > address
> > > > >> > > > > > this
> > > > >> > > > > > > > example.
> > > > >> > > > > > > >
> > > > >> > > > > > > > I think the remaining question is how to address the
> > > > >> scenario
> > > > >> > > that
> > > > >> > > > > the
> > > > >> > > > > > > > topic is deleted and re-created. One possible
> solution
> > > is
> > > > to
> > > > >> > > commit
> > > > >> > > > > > > offset
> > > > >> > > > > > > > with both the leader epoch and the metadata version.
> > The
> > > > >> logic
> > > > >> > > and
> > > > >> > > > > the
> > > > >> > > > > > > > implementation of this solution does not require a
> new
> > > > >> concept
> > > > >> > > > (e.g.
> > > > >> > > > > > > > partition epoch) and it does not require any change
> to
> > > the
> > > > >> > > message
> > > > >> > > > > > format
> > > > >> > > > > > > > or leader epoch. It also allows us to order the
> > metadata
> > > > in
> > > > >> a
> > > > >> > > > > > > > straightforward manner which may be useful in the
> > > future.
> > > > >> So it
> > > > >> > > may
> > > > >> > > > > be
> > > > >> > > > > > a
> > > > >> > > > > > > > better solution than generating a random partition
> > epoch
> > > > >> every
> > > > >> > > time
> > > > >> > > > > we
> > > > >> > > > > > > > create a partition. Does this sound reasonable?
> > > > >> > > > > > > >
> > > > >> > > > > > > > Previously one concern with using the metadata
> version
> > > is
> > > > >> that
> > > > >> > > > > consumer
> > > > >> > > > > > > > will be forced to refresh metadata even if metadata
> > > > version
> > > > >> is
> > > > >> > > > > > increased
> > > > >> > > > > > > > due to topics that the consumer is not interested
> in.
> > > Now
> > > > I
> > > > >> > > > realized
> > > > >> > > > > > that
> > > > >> > > > > > > > this is probably not a problem. Currently client
> will
> > > > >> refresh
> > > > >> > > > > metadata
> > > > >> > > > > > > > either due to InvalidMetadataException in the
> response
> > > > from
> > > > >> > > broker
> > > > >> > > > or
> > > > >> > > > > > due
> > > > >> > > > > > > > to metadata expiry. The addition of the metadata
> > version
> > > > >> should
> > > > >> > > > > > increase
> > > > >> > > > > > > > the overhead of metadata refresh caused by
> > > > >> > > > InvalidMetadataException.
> > > > >> > > > > If
> > > > >> > > > > > > > client refresh metadata due to expiry and it
> receives
> > a
> > > > >> > metadata
> > > > >> > > > > whose
> > > > >> > > > > > > > version is lower than the current metadata version,
> we
> > > can
> > > > >> > reject
> > > > >> > > > the
> > > > >> > > > > > > > metadata but still reset the metadata age, which
> > > > essentially
> > > > >> > keep
> > > > >> > > > the
> > > > >> > > > > > > > existing behavior in the client.
> > > > >> > > > > > > >
> > > > >> > > > > > > > Thanks much,
> > > > >> > > > > > > > Dong
> > > > >> > > > > > > >
> > > > >> > > > > > >
> > > > >> > > > > >
> > > > >> > > > >
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to