Hey Jun,

Thanks so much. These comments very useful. Please see below my comments.

On Mon, Jan 8, 2018 at 5:52 PM, Jun Rao <j...@confluent.io> wrote:

> Hi, Dong,
>
> Thanks for the updated KIP. A few more comments.
>
> 60. Perhaps having a partition epoch is more flexible since in the future,
> we may support deleting a partition as well.
>

Yeah I have considered this. I think we can probably still support deleting
a partition by using the topic_epoch -- when partition of a topic is
deleted or created, epoch of all partitions of this topic will be
incremented by 1. Therefore, if that partition is re-created later, the
epoch of that partition will still be larger than its epoch before the
deletion, which still allows the client to order the metadata for the
purpose of this KIP. Does this sound reasonable?

The advantage of using topic_epoch instead of partition_epoch is that the
size of the /brokers/topics/[topic] znode and request/response size can be
smaller. We have a limit on the maximum size of znode (typically 1MB). Use
partition epoch can effectively reduce the number of partitions that can be
described by the /brokers/topics/[topic] znode.

One use-case of partition_epoch for client to detect that the committed
offset, either from kafka offset topic or from the external store is
invalid after partition deletion and re-creation. However, it seems that we
can also address this use-case with other approaches. For example, when
AdminClient deletes partitions, it can also delete the committed offsets
for those partitions from the offset topic. If user stores offset
externally, it might make sense for user to similarly remove offsets of
related partitions after these partitions are deleted. So I am not sure
that we should use partition_epoch in this KIP.


>
> 61. It seems that the leader epoch returned in the position() call should
> the the leader epoch returned in the fetch response, not the one in the
> metadata cache of the client.


I think this is a good idea. Just to double check, this change does not
affect the correctness or performance of this KIP. But it can be useful if
we want to use the leader_epoch to better handle the offset rest in case of
unclean leader election, which is listed in the future work. Is this
understanding correct?

I have updated the KIP to specify that the leader_epoch returned by
position() should be the largest leader_epoch of those already consumed
messages whose offset < position. If no message has been consumed since
consumer initialization, the leader_epoch from seek() or
OffsetFetchResponse should be used. The offset included in the
OffsetCommitRequest will also be determined in the similar manner.


>
> 62. I am wondering if we should return the partition epoch in the fetch
> response as well. In the current proposal, if a topic is recreated and the
> new leader is on the same broker as the old one, there is nothing to force
> the metadata refresh in the client. So, the client may still associate the
> offset with the old partition epoch.
>

Could you help me understand the problem if a client associates old
partition_epoch (or the topic_epoch as of the current KIP) with the offset?
The main purpose of the topic_epoch is to be able to drop leader_epoch to 0
after a partition is deleted and re-created. I guess you may be thinking
about using the partition_epoch to detect that the committed offset is
invalid? In that case, I am wondering if the alternative approach described
in 60) would be reasonable.


>
> 63. There is some subtle coordination between the LeaderAndIsrRequest and
> UpdateMetadataRequest. Currently, when a leader changes, the controller
> first sends the LeaderAndIsrRequest to the assigned replicas and the
> UpdateMetadataRequest to every broker. So, there could be a small window
> when the leader already receives the new partition epoch in the
> LeaderAndIsrRequest, but the metadata cache in the broker hasn't been
> updated with the latest partition epoch. Not sure what's the best way to
> address this issue. Perhaps we can update the metadata cache on the broker
> with both LeaderAndIsrRequest and UpdateMetadataRequest. The challenge is
> that the two have slightly different data. For example, only the latter has
> all endpoints.
>

I am not sure whether this is a problem. Could you explain a bit more what
specific problem this small window can cause?

Since client can fetch metadata from any broker in the cluster, and given
that different brokers receive request (e.g. LeaderAndIsrRequest and
UpdateMetadataRequest) in arbitrary order, the metadata received by client
can be in arbitrary order (either newer or older) compared to the broker's
leadership state even if a given broker receives LeaderAndIsrRequest and
UpdateMetadataRequest simultaneously. So I am not sure it is useful to
update broker's cache with LeaderAndIsrRequest.


> 64. The enforcement of leader epoch in Offset commit: We allow a consumer
> to set an arbitrary offset. So it's possible for offsets or leader epoch to
> go backwards. I am not sure if we could always enforce that the leader
> epoch only goes up on the broker.
>

Sure. I have removed this check from the KIP.

BTW, we can probably still ensure that the leader_epoch always increase if
the leader_epoch used with offset commit is the max(leader_epoch of the
message with offset = the committed offset - 1, the largest known
leader_epoch from the metadata). But I don't have a good use-case for this
alternative definition. So I choose the keep the KIP simple by requiring
leader_epoch to always increase.


> 65. Good point on handling missing partition epoch due to topic deletion.
> Another potential way to address this is to additionally propagate the
> global partition epoch to brokers and the clients. This way, when a
> partition epoch is missing, we can use the global partition epoch to reason
> about which metadata is more recent.
>

This is a great idea. The global epoch can be used to order the metadata
and help us recognize the more recent metadata if a topic (or partition) is
deleted and re-created.

Actually, it seems we only need to propagate the global epoch to brokers
and clients without propagating this epoch on a per-topic or per-partition
basic. Doing so would simply interface changes made this KIP. Does this
approach sound reasonable?


> 66. A client may also get an offset by time using the offsetForTimes() api.
> So, we probably want to include offsetInternalMetadata in
> OffsetAndTimestamp
> as well.
>

You are right. This probably also requires us to change the
ListOffsetRequest as well. I will update the KIP after we agree on the
solution for 65).


>
> 67. InteralMetadata can be a bit confusing with the metadata field already
> there. Perhaps we can just call it OffsetEpoch. It might be useful to make
> OffsetEpoch printable at least for debugging purpose. Once you do that, we
> are already exposing the internal fields. So, not sure if it's worth hiding
> them. If we do want to hide them, perhaps we can have sth like the
> following. The binary encoding is probably more efficient than JSON for
> external storage.
>
> OffsetEpoch {
>  static OffsetEpoch decode(byte[]);
>
>   public byte[] encode();
>
>   public String toString();
> }
>

Thanks much. I like this solution. I have updated the KIP accordingly.



>
> Jun
>
> On Mon, Jan 8, 2018 at 4:22 PM, Dong Lin <lindon...@gmail.com> wrote:
>
> > Hey Jason,
> >
> > Certainly. This sounds good. I have updated the KIP to clarity that the
> > global epoch will be incremented by 1 each time a topic is deleted.
> >
> > Thanks,
> > Dong
> >
> > On Mon, Jan 8, 2018 at 4:09 PM, Jason Gustafson <ja...@confluent.io>
> > wrote:
> >
> > > Hi Dong,
> > >
> > >
> > > I think your approach will allow user to distinguish between the
> metadata
> > > > before and after the topic deletion. I also agree that this can be
> > > > potentially be useful to user. I am just not very sure whether we
> > already
> > > > have a good use-case to make the additional complexity worthwhile. It
> > > seems
> > > > that this feature is kind of independent of the main problem of this
> > KIP.
> > > > Could we add this as a future work?
> > >
> > >
> > > Do you think it's fair if we bump the topic epoch on deletion and leave
> > > propagation of the epoch for deleted topics for future work? I don't
> > think
> > > this adds much complexity and it makes the behavior consistent: every
> > topic
> > > mutation results in an epoch bump.
> > >
> > > Thanks,
> > > Jason
> > >
> > > On Mon, Jan 8, 2018 at 3:14 PM, Dong Lin <lindon...@gmail.com> wrote:
> > >
> > > > Hey Ismael,
> > > >
> > > > I guess we actually need user to see this field so that user can
> store
> > > this
> > > > value in the external store together with the offset. We just prefer
> > the
> > > > value to be opaque to discourage most users from interpreting this
> > value.
> > > > One more advantage of using such an opaque field is to be able to
> > evolve
> > > > the information (or schema) of this value without changing consumer
> API
> > > in
> > > > the future.
> > > >
> > > > I also thinking it is probably OK for user to be able to interpret
> this
> > > > value, particularly for those advanced users.
> > > >
> > > > Thanks,
> > > > Dong
> > > >
> > > > On Mon, Jan 8, 2018 at 2:34 PM, Ismael Juma <ism...@juma.me.uk>
> wrote:
> > > >
> > > > > On Fri, Jan 5, 2018 at 7:15 PM, Jason Gustafson <
> ja...@confluent.io>
> > > > > wrote:
> > > > > >
> > > > > > class OffsetAndMetadata {
> > > > > >   long offset;
> > > > > >   byte[] offsetMetadata;
> > > > > >   String metadata;
> > > > > > }
> > > > >
> > > > >
> > > > > > Admittedly, the naming is a bit annoying, but we can probably
> come
> > up
> > > > > with
> > > > > > something better. Internally the byte array would have a version.
> > If
> > > in
> > > > > the
> > > > > > future we have anything else we need to add, we can update the
> > > version
> > > > > and
> > > > > > we wouldn't need any new APIs.
> > > > > >
> > > > >
> > > > > We can also add fields to a class in a compatible way. So, it seems
> > to
> > > me
> > > > > that the main advantage of the byte array is that it's opaque to
> the
> > > > user.
> > > > > Is that correct? If so, we could also add any opaque metadata in a
> > > > subclass
> > > > > so that users don't even see it (unless they cast it, but then
> > they're
> > > on
> > > > > their own).
> > > > >
> > > > > Ismael
> > > > >
> > > > > The corresponding seek() and position() APIs might look something
> > like
> > > > > this:
> > > > > >
> > > > > > void seek(TopicPartition partition, long offset, byte[]
> > > > offsetMetadata);
> > > > > > byte[] positionMetadata(TopicPartition partition);
> > > > > >
> > > > > > What do you think?
> > > > > >
> > > > > > Thanks,
> > > > > > Jason
> > > > > >
> > > > > > On Thu, Jan 4, 2018 at 7:04 PM, Dong Lin <lindon...@gmail.com>
> > > wrote:
> > > > > >
> > > > > > > Hey Jun, Jason,
> > > > > > >
> > > > > > > Thanks much for all the feedback. I have updated the KIP based
> on
> > > the
> > > > > > > latest discussion. Can you help check whether it looks good?
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Dong
> > > > > > >
> > > > > > > On Thu, Jan 4, 2018 at 5:36 PM, Dong Lin <lindon...@gmail.com>
> > > > wrote:
> > > > > > >
> > > > > > > > Hey Jun,
> > > > > > > >
> > > > > > > > Hmm... thinking about this more, I am not sure that the
> > proposed
> > > > API
> > > > > is
> > > > > > > > sufficient. For users that store offset externally, we
> probably
> > > > need
> > > > > > > extra
> > > > > > > > API to return the leader_epoch and partition_epoch for all
> > > > partitions
> > > > > > > that
> > > > > > > > consumers are consuming. I suppose these users currently use
> > > > > position()
> > > > > > > to
> > > > > > > > get the offset. Thus we probably need a new method
> > > > > > positionWithEpoch(..)
> > > > > > > to
> > > > > > > > return <offset, partition_epoch, leader_epoch>. Does this
> sound
> > > > > > > reasonable?
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Dong
> > > > > > > >
> > > > > > > >
> > > > > > > > On Thu, Jan 4, 2018 at 5:26 PM, Jun Rao <j...@confluent.io>
> > > wrote:
> > > > > > > >
> > > > > > > >> Hi, Dong,
> > > > > > > >>
> > > > > > > >> Yes, that's what I am thinking. OffsetEpoch will be composed
> > of
> > > > > > > >> (partition_epoch,
> > > > > > > >> leader_epoch).
> > > > > > > >>
> > > > > > > >> Thanks,
> > > > > > > >>
> > > > > > > >> Jun
> > > > > > > >>
> > > > > > > >>
> > > > > > > >> On Thu, Jan 4, 2018 at 4:22 PM, Dong Lin <
> lindon...@gmail.com
> > >
> > > > > wrote:
> > > > > > > >>
> > > > > > > >> > Hey Jun,
> > > > > > > >> >
> > > > > > > >> > Thanks much. I like the the new API that you proposed. I
> am
> > > not
> > > > > sure
> > > > > > > >> what
> > > > > > > >> > you exactly mean by offset_epoch. I suppose that we can
> use
> > > the
> > > > > pair
> > > > > > > of
> > > > > > > >> > (partition_epoch, leader_epoch) as the offset_epoch,
> right?
> > > > > > > >> >
> > > > > > > >> > Thanks,
> > > > > > > >> > Dong
> > > > > > > >> >
> > > > > > > >> > On Thu, Jan 4, 2018 at 4:02 PM, Jun Rao <j...@confluent.io
> >
> > > > wrote:
> > > > > > > >> >
> > > > > > > >> > > Hi, Dong,
> > > > > > > >> > >
> > > > > > > >> > > Got it. The api that you proposed works. The question is
> > > > whether
> > > > > > > >> that's
> > > > > > > >> > the
> > > > > > > >> > > api that we want to have in the long term. My concern is
> > > that
> > > > > > while
> > > > > > > >> the
> > > > > > > >> > api
> > > > > > > >> > > change is simple, the new api seems harder to explain
> and
> > > use.
> > > > > For
> > > > > > > >> > example,
> > > > > > > >> > > a consumer storing offsets externally now needs to call
> > > > > > > >> > > waitForMetadataUpdate() after calling seek().
> > > > > > > >> > >
> > > > > > > >> > > An alternative approach is to make the following
> > compatible
> > > > api
> > > > > > > >> changes
> > > > > > > >> > in
> > > > > > > >> > > Consumer.
> > > > > > > >> > > * Add an additional OffsetEpoch field in
> > OffsetAndMetadata.
> > > > (no
> > > > > > need
> > > > > > > >> to
> > > > > > > >> > > change the CommitSync() api)
> > > > > > > >> > > * Add a new api seek(TopicPartition partition, long
> > offset,
> > > > > > > >> OffsetEpoch
> > > > > > > >> > > offsetEpoch). We can potentially deprecate the old api
> > > > > > > >> > seek(TopicPartition
> > > > > > > >> > > partition, long offset) in the future.
> > > > > > > >> > >
> > > > > > > >> > > The alternative approach has similar amount of api
> changes
> > > as
> > > > > > yours
> > > > > > > >> but
> > > > > > > >> > has
> > > > > > > >> > > the following benefits.
> > > > > > > >> > > 1. The api works in a similar way as how offset
> management
> > > > works
> > > > > > now
> > > > > > > >> and
> > > > > > > >> > is
> > > > > > > >> > > probably what we want in the long term.
> > > > > > > >> > > 2. It can reset offsets better when there is data loss
> due
> > > to
> > > > > > > unclean
> > > > > > > >> > > leader election or correlated replica failure.
> > > > > > > >> > > 3. It can reset offsets better when topic is recreated.
> > > > > > > >> > >
> > > > > > > >> > > Thanks,
> > > > > > > >> > >
> > > > > > > >> > > Jun
> > > > > > > >> > >
> > > > > > > >> > >
> > > > > > > >> > > On Thu, Jan 4, 2018 at 2:05 PM, Dong Lin <
> > > lindon...@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > >> > >
> > > > > > > >> > > > Hey Jun,
> > > > > > > >> > > >
> > > > > > > >> > > > Yeah I agree that ideally we don't want an ever
> growing
> > > > global
> > > > > > > >> metadata
> > > > > > > >> > > > version. I just think it may be more desirable to keep
> > the
> > > > > > > consumer
> > > > > > > >> API
> > > > > > > >> > > > simple.
> > > > > > > >> > > >
> > > > > > > >> > > > In my current proposal, metadata version returned in
> the
> > > > fetch
> > > > > > > >> response
> > > > > > > >> > > > will be stored with the offset together. More
> > > specifically,
> > > > > the
> > > > > > > >> > > > metadata_epoch in the new offset topic schema will be
> > the
> > > > > > largest
> > > > > > > >> > > > metadata_epoch from all the MetadataResponse and
> > > > FetchResponse
> > > > > > > ever
> > > > > > > >> > > > received by this consumer.
> > > > > > > >> > > >
> > > > > > > >> > > > We probably don't have to change the consumer API for
> > > > > > > >> > > > commitSync(Map<TopicPartition, OffsetAndMetadata>). If
> > > user
> > > > > > calls
> > > > > > > >> > > > commitSync(...) to commit offset 10 for a given
> > partition,
> > > > for
> > > > > > > most
> > > > > > > >> > > > use-cases, this consumer instance should have consumed
> > > > message
> > > > > > > with
> > > > > > > >> > > offset
> > > > > > > >> > > > 9 from this partition, in which case the consumer can
> > > > remember
> > > > > > and
> > > > > > > >> use
> > > > > > > >> > > the
> > > > > > > >> > > > metadata_epoch from the corresponding FetchResponse
> when
> > > > > > > committing
> > > > > > > >> > > offset.
> > > > > > > >> > > > If user calls commitSync(..) to commit offset 10 for a
> > > given
> > > > > > > >> partition
> > > > > > > >> > > > without having consumed the message with offset 9
> using
> > > this
> > > > > > > >> consumer
> > > > > > > >> > > > instance, this is probably an advanced use-case. In
> this
> > > > case
> > > > > > the
> > > > > > > >> > > advanced
> > > > > > > >> > > > user can retrieve the metadata_epoch using the newly
> > added
> > > > > > > >> > > metadataEpoch()
> > > > > > > >> > > > API after it fetches the message with offset 9
> (probably
> > > > from
> > > > > > > >> another
> > > > > > > >> > > > consumer instance) and encode this metadata_epoch in
> the
> > > > > > > >> > > > string OffsetAndMetadata.metadata. Do you think this
> > > > solution
> > > > > > > would
> > > > > > > >> > work?
> > > > > > > >> > > >
> > > > > > > >> > > > By "not sure that I fully understand your latest
> > > > suggestion",
> > > > > > are
> > > > > > > >> you
> > > > > > > >> > > > referring to solution related to unclean leader
> election
> > > > using
> > > > > > > >> > > leader_epoch
> > > > > > > >> > > > in my previous email?
> > > > > > > >> > > >
> > > > > > > >> > > > Thanks,
> > > > > > > >> > > > Dong
> > > > > > > >> > > >
> > > > > > > >> > > > On Thu, Jan 4, 2018 at 1:33 PM, Jun Rao <
> > j...@confluent.io
> > > >
> > > > > > wrote:
> > > > > > > >> > > >
> > > > > > > >> > > > > Hi, Dong,
> > > > > > > >> > > > >
> > > > > > > >> > > > > Not sure that I fully understand your latest
> > suggestion.
> > > > > > > >> Returning an
> > > > > > > >> > > > ever
> > > > > > > >> > > > > growing global metadata version itself is no ideal,
> > but
> > > is
> > > > > > fine.
> > > > > > > >> My
> > > > > > > >> > > > > question is whether the metadata version returned in
> > the
> > > > > fetch
> > > > > > > >> > response
> > > > > > > >> > > > > needs to be stored with the offset together if
> offsets
> > > are
> > > > > > > stored
> > > > > > > >> > > > > externally. If so, we also have to change the
> consumer
> > > API
> > > > > for
> > > > > > > >> > > > commitSync()
> > > > > > > >> > > > > and need to worry about compatibility. If we don't
> > store
> > > > the
> > > > > > > >> metadata
> > > > > > > >> > > > > version together with the offset, on a consumer
> > restart,
> > > > > it's
> > > > > > > not
> > > > > > > >> > clear
> > > > > > > >> > > > how
> > > > > > > >> > > > > we can ensure the metadata in the consumer is high
> > > enough
> > > > > > since
> > > > > > > >> there
> > > > > > > >> > > is
> > > > > > > >> > > > no
> > > > > > > >> > > > > metadata version to compare with.
> > > > > > > >> > > > >
> > > > > > > >> > > > > Thanks,
> > > > > > > >> > > > >
> > > > > > > >> > > > > Jun
> > > > > > > >> > > > >
> > > > > > > >> > > > >
> > > > > > > >> > > > > On Wed, Jan 3, 2018 at 6:43 PM, Dong Lin <
> > > > > lindon...@gmail.com
> > > > > > >
> > > > > > > >> > wrote:
> > > > > > > >> > > > >
> > > > > > > >> > > > > > Hey Jun,
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > Thanks much for the explanation.
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > I understand the advantage of partition_epoch over
> > > > > > > >> metadata_epoch.
> > > > > > > >> > My
> > > > > > > >> > > > > > current concern is that the use of leader_epoch
> and
> > > the
> > > > > > > >> > > partition_epoch
> > > > > > > >> > > > > > requires us considerable change to consumer's
> public
> > > API
> > > > > to
> > > > > > > take
> > > > > > > >> > care
> > > > > > > >> > > > of
> > > > > > > >> > > > > > the case where user stores offset externally. For
> > > > example,
> > > > > > > >> > > *consumer*.
> > > > > > > >> > > > > > *commitSync*(..) would have to take a map whose
> > value
> > > is
> > > > > > > >> <offset,
> > > > > > > >> > > > > metadata,
> > > > > > > >> > > > > > leader epoch, partition epoch>.
> > *consumer*.*seek*(...)
> > > > > would
> > > > > > > >> also
> > > > > > > >> > > need
> > > > > > > >> > > > > > leader_epoch and partition_epoch as parameter.
> > > > Technically
> > > > > > we
> > > > > > > >> can
> > > > > > > >> > > > > probably
> > > > > > > >> > > > > > still make it work in a backward compatible manner
> > > after
> > > > > > > careful
> > > > > > > >> > > design
> > > > > > > >> > > > > and
> > > > > > > >> > > > > > discussion. But these changes can make the
> > consumer's
> > > > > > > interface
> > > > > > > >> > > > > > unnecessarily complex for more users who do not
> > store
> > > > > offset
> > > > > > > >> > > > externally.
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > After thinking more about it, we can address all
> > > > problems
> > > > > > > >> discussed
> > > > > > > >> > > by
> > > > > > > >> > > > > only
> > > > > > > >> > > > > > using the metadata_epoch without introducing
> > > > leader_epoch
> > > > > or
> > > > > > > the
> > > > > > > >> > > > > > partition_epoch. The current KIP describes the
> > changes
> > > > to
> > > > > > the
> > > > > > > >> > > consumer
> > > > > > > >> > > > > API
> > > > > > > >> > > > > > and how the new API can be used if user stores
> > offset
> > > > > > > >> externally.
> > > > > > > >> > In
> > > > > > > >> > > > > order
> > > > > > > >> > > > > > to address the scenario you described earlier, we
> > can
> > > > > > include
> > > > > > > >> > > > > > metadata_epoch in the FetchResponse and the
> > > > > > > LeaderAndIsrRequest.
> > > > > > > >> > > > Consumer
> > > > > > > >> > > > > > remembers the largest metadata_epoch from all the
> > > > > > > FetchResponse
> > > > > > > >> it
> > > > > > > >> > > has
> > > > > > > >> > > > > > received. The metadata_epoch committed with the
> > > offset,
> > > > > > either
> > > > > > > >> > within
> > > > > > > >> > > > or
> > > > > > > >> > > > > > outside Kafka, should be the largest
> metadata_epoch
> > > > across
> > > > > > all
> > > > > > > >> > > > > > FetchResponse and MetadataResponse ever received
> by
> > > this
> > > > > > > >> consumer.
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > The drawback of using only the metadata_epoch is
> > that
> > > we
> > > > > can
> > > > > > > not
> > > > > > > >> > > always
> > > > > > > >> > > > > do
> > > > > > > >> > > > > > the smart offset reset in case of unclean leader
> > > > election
> > > > > > > which
> > > > > > > >> you
> > > > > > > >> > > > > > mentioned earlier. But in most case, unclean
> leader
> > > > > election
> > > > > > > >> > probably
> > > > > > > >> > > > > > happens when consumer is not
> rebalancing/restarting.
> > > In
> > > > > > these
> > > > > > > >> > cases,
> > > > > > > >> > > > > either
> > > > > > > >> > > > > > consumer is not directly affected by unclean
> leader
> > > > > election
> > > > > > > >> since
> > > > > > > >> > it
> > > > > > > >> > > > is
> > > > > > > >> > > > > > not consuming from the end of the log, or consumer
> > can
> > > > > > derive
> > > > > > > >> the
> > > > > > > >> > > > > > leader_epoch from the most recent message received
> > > > before
> > > > > it
> > > > > > > >> sees
> > > > > > > >> > > > > > OffsetOutOfRangeException. So I am not sure it is
> > > worth
> > > > > > adding
> > > > > > > >> the
> > > > > > > >> > > > > > leader_epoch to consumer API to address the
> > remaining
> > > > > corner
> > > > > > > >> case.
> > > > > > > >> > > What
> > > > > > > >> > > > > do
> > > > > > > >> > > > > > you think?
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > Thanks,
> > > > > > > >> > > > > > Dong
> > > > > > > >> > > > > >
> > > > > > > >> > > > > >
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > On Tue, Jan 2, 2018 at 6:28 PM, Jun Rao <
> > > > j...@confluent.io
> > > > > >
> > > > > > > >> wrote:
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > > Hi, Dong,
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > Thanks for the reply.
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > To solve the topic recreation issue, we could
> use
> > > > > either a
> > > > > > > >> global
> > > > > > > >> > > > > > metadata
> > > > > > > >> > > > > > > version or a partition level epoch. But either
> one
> > > > will
> > > > > > be a
> > > > > > > >> new
> > > > > > > >> > > > > concept,
> > > > > > > >> > > > > > > right? To me, the latter seems more natural. It
> > also
> > > > > makes
> > > > > > > it
> > > > > > > >> > > easier
> > > > > > > >> > > > to
> > > > > > > >> > > > > > > detect if a consumer's offset is still valid
> > after a
> > > > > topic
> > > > > > > is
> > > > > > > >> > > > > recreated.
> > > > > > > >> > > > > > As
> > > > > > > >> > > > > > > you pointed out, we don't need to store the
> > > partition
> > > > > > epoch
> > > > > > > in
> > > > > > > >> > the
> > > > > > > >> > > > > > message.
> > > > > > > >> > > > > > > The following is what I am thinking. When a
> > > partition
> > > > is
> > > > > > > >> created,
> > > > > > > >> > > we
> > > > > > > >> > > > > can
> > > > > > > >> > > > > > > assign a partition epoch from an ever-increasing
> > > > global
> > > > > > > >> counter
> > > > > > > >> > and
> > > > > > > >> > > > > store
> > > > > > > >> > > > > > > it in /brokers/topics/[topic]/
> > > > partitions/[partitionId]
> > > > > in
> > > > > > > ZK.
> > > > > > > >> > The
> > > > > > > >> > > > > > > partition
> > > > > > > >> > > > > > > epoch is propagated to every broker. The
> consumer
> > > will
> > > > > be
> > > > > > > >> > tracking
> > > > > > > >> > > a
> > > > > > > >> > > > > > tuple
> > > > > > > >> > > > > > > of <offset, leader epoch, partition epoch> for
> > > > offsets.
> > > > > > If a
> > > > > > > >> > topic
> > > > > > > >> > > is
> > > > > > > >> > > > > > > recreated, it's possible that a consumer's
> offset
> > > and
> > > > > > leader
> > > > > > > >> > epoch
> > > > > > > >> > > > > still
> > > > > > > >> > > > > > > match that in the broker, but partition epoch
> > won't
> > > > be.
> > > > > In
> > > > > > > >> this
> > > > > > > >> > > case,
> > > > > > > >> > > > > we
> > > > > > > >> > > > > > > can potentially still treat the consumer's
> offset
> > as
> > > > out
> > > > > > of
> > > > > > > >> range
> > > > > > > >> > > and
> > > > > > > >> > > > > > reset
> > > > > > > >> > > > > > > the offset based on the offset reset policy in
> the
> > > > > > consumer.
> > > > > > > >> This
> > > > > > > >> > > > seems
> > > > > > > >> > > > > > > harder to do with a global metadata version.
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > Jun
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > On Mon, Dec 25, 2017 at 6:56 AM, Dong Lin <
> > > > > > > >> lindon...@gmail.com>
> > > > > > > >> > > > wrote:
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > > Hey Jun,
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > > > This is a very good example. After thinking
> > > through
> > > > > this
> > > > > > > in
> > > > > > > >> > > > detail, I
> > > > > > > >> > > > > > > agree
> > > > > > > >> > > > > > > > that we need to commit offset with leader
> epoch
> > in
> > > > > order
> > > > > > > to
> > > > > > > >> > > address
> > > > > > > >> > > > > > this
> > > > > > > >> > > > > > > > example.
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > > > I think the remaining question is how to
> address
> > > the
> > > > > > > >> scenario
> > > > > > > >> > > that
> > > > > > > >> > > > > the
> > > > > > > >> > > > > > > > topic is deleted and re-created. One possible
> > > > solution
> > > > > > is
> > > > > > > to
> > > > > > > >> > > commit
> > > > > > > >> > > > > > > offset
> > > > > > > >> > > > > > > > with both the leader epoch and the metadata
> > > version.
> > > > > The
> > > > > > > >> logic
> > > > > > > >> > > and
> > > > > > > >> > > > > the
> > > > > > > >> > > > > > > > implementation of this solution does not
> > require a
> > > > new
> > > > > > > >> concept
> > > > > > > >> > > > (e.g.
> > > > > > > >> > > > > > > > partition epoch) and it does not require any
> > > change
> > > > to
> > > > > > the
> > > > > > > >> > > message
> > > > > > > >> > > > > > format
> > > > > > > >> > > > > > > > or leader epoch. It also allows us to order
> the
> > > > > metadata
> > > > > > > in
> > > > > > > >> a
> > > > > > > >> > > > > > > > straightforward manner which may be useful in
> > the
> > > > > > future.
> > > > > > > >> So it
> > > > > > > >> > > may
> > > > > > > >> > > > > be
> > > > > > > >> > > > > > a
> > > > > > > >> > > > > > > > better solution than generating a random
> > partition
> > > > > epoch
> > > > > > > >> every
> > > > > > > >> > > time
> > > > > > > >> > > > > we
> > > > > > > >> > > > > > > > create a partition. Does this sound
> reasonable?
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > > > Previously one concern with using the metadata
> > > > version
> > > > > > is
> > > > > > > >> that
> > > > > > > >> > > > > consumer
> > > > > > > >> > > > > > > > will be forced to refresh metadata even if
> > > metadata
> > > > > > > version
> > > > > > > >> is
> > > > > > > >> > > > > > increased
> > > > > > > >> > > > > > > > due to topics that the consumer is not
> > interested
> > > > in.
> > > > > > Now
> > > > > > > I
> > > > > > > >> > > > realized
> > > > > > > >> > > > > > that
> > > > > > > >> > > > > > > > this is probably not a problem. Currently
> client
> > > > will
> > > > > > > >> refresh
> > > > > > > >> > > > > metadata
> > > > > > > >> > > > > > > > either due to InvalidMetadataException in the
> > > > response
> > > > > > > from
> > > > > > > >> > > broker
> > > > > > > >> > > > or
> > > > > > > >> > > > > > due
> > > > > > > >> > > > > > > > to metadata expiry. The addition of the
> metadata
> > > > > version
> > > > > > > >> should
> > > > > > > >> > > > > > increase
> > > > > > > >> > > > > > > > the overhead of metadata refresh caused by
> > > > > > > >> > > > InvalidMetadataException.
> > > > > > > >> > > > > If
> > > > > > > >> > > > > > > > client refresh metadata due to expiry and it
> > > > receives
> > > > > a
> > > > > > > >> > metadata
> > > > > > > >> > > > > whose
> > > > > > > >> > > > > > > > version is lower than the current metadata
> > > version,
> > > > we
> > > > > > can
> > > > > > > >> > reject
> > > > > > > >> > > > the
> > > > > > > >> > > > > > > > metadata but still reset the metadata age,
> which
> > > > > > > essentially
> > > > > > > >> > keep
> > > > > > > >> > > > the
> > > > > > > >> > > > > > > > existing behavior in the client.
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > > > Thanks much,
> > > > > > > >> > > > > > > > Dong
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > >
> > > > > > > >> > > > >
> > > > > > > >> > > >
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >>
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to