Hey Jun, Jason,

Thanks for all the comments. Could you see if you can give +1 for the KIP?
I am open to make further improvements for the KIP.

Thanks,
Dong

On Tue, Jan 23, 2018 at 3:44 PM, Dong Lin <lindon...@gmail.com> wrote:

> Hey Jun, Jason,
>
> Thanks much for all the review! I will open the voting thread.
>
> Regards,
> Dong
>
> On Tue, Jan 23, 2018 at 3:37 PM, Jun Rao <j...@confluent.io> wrote:
>
>> Hi, Dong,
>>
>> The current KIP looks good to me.
>>
>> Thanks,
>>
>> Jun
>>
>> On Tue, Jan 23, 2018 at 12:29 PM, Dong Lin <lindon...@gmail.com> wrote:
>>
>> > Hey Jun,
>> >
>> > Do you think the current KIP looks OK? I am wondering if we can open the
>> > voting thread.
>> >
>> > Thanks!
>> > Dong
>> >
>> > On Fri, Jan 19, 2018 at 3:08 PM, Dong Lin <lindon...@gmail.com> wrote:
>> >
>> > > Hey Jun,
>> > >
>> > > I think we can probably have a static method in Util class to decode
>> the
>> > > byte[]. Both KafkaConsumer implementation and the user application
>> will
>> > be
>> > > able to decode the byte array and log its content for debug purpose.
>> So
>> > it
>> > > seems that we can still print the information we want. It is just not
>> > > explicitly exposed in the consumer interface. Would this address the
>> > > problem here?
>> > >
>> > > Yeah we can include OffsetEpoch in AdminClient. This can be added in
>> > > KIP-222? Is there something you would like me to add in this KIP?
>> > >
>> > > Thanks!
>> > > Dong
>> > >
>> > > On Fri, Jan 19, 2018 at 3:00 PM, Jun Rao <j...@confluent.io> wrote:
>> > >
>> > >> Hi, Dong,
>> > >>
>> > >> The issue with using just byte[] for OffsetEpoch is that it won't be
>> > >> printable, which makes debugging harder.
>> > >>
>> > >> Also, KIP-222 proposes a listGroupOffset() method in AdminClient. If
>> > that
>> > >> gets adopted before this KIP, we probably want to include
>> OffsetEpoch in
>> > >> the AdminClient too.
>> > >>
>> > >> Thanks,
>> > >>
>> > >> Jun
>> > >>
>> > >>
>> > >> On Thu, Jan 18, 2018 at 6:30 PM, Dong Lin <lindon...@gmail.com>
>> wrote:
>> > >>
>> > >> > Hey Jun,
>> > >> >
>> > >> > I agree. I have updated the KIP to remove the class OffetEpoch and
>> > >> replace
>> > >> > OffsetEpoch with byte[] in APIs that use it. Can you see if it
>> looks
>> > >> good?
>> > >> >
>> > >> > Thanks!
>> > >> > Dong
>> > >> >
>> > >> > On Thu, Jan 18, 2018 at 6:07 PM, Jun Rao <j...@confluent.io> wrote:
>> > >> >
>> > >> > > Hi, Dong,
>> > >> > >
>> > >> > > Thanks for the updated KIP. It looks good to me now. The only
>> thing
>> > is
>> > >> > > for OffsetEpoch.
>> > >> > > If we expose the individual fields in the class, we probably
>> don't
>> > >> need
>> > >> > the
>> > >> > > encode/decode methods. If we want to hide the details of
>> > OffsetEpoch,
>> > >> we
>> > >> > > probably don't want expose the individual fields.
>> > >> > >
>> > >> > > Jun
>> > >> > >
>> > >> > > On Wed, Jan 17, 2018 at 10:10 AM, Dong Lin <lindon...@gmail.com>
>> > >> wrote:
>> > >> > >
>> > >> > > > Thinking about point 61 more, I realize that the async
>> zookeeper
>> > >> read
>> > >> > may
>> > >> > > > make it less of an issue for controller to read more zookeeper
>> > >> nodes.
>> > >> > > > Writing partition_epoch in the per-partition znode makes it
>> > simpler
>> > >> to
>> > >> > > > handle the broker failure between zookeeper writes for a topic
>> > >> > creation.
>> > >> > > I
>> > >> > > > have updated the KIP to use the suggested approach.
>> > >> > > >
>> > >> > > >
>> > >> > > > On Wed, Jan 17, 2018 at 9:57 AM, Dong Lin <lindon...@gmail.com
>> >
>> > >> wrote:
>> > >> > > >
>> > >> > > > > Hey Jun,
>> > >> > > > >
>> > >> > > > > Thanks much for the comments. Please see my comments inline.
>> > >> > > > >
>> > >> > > > > On Tue, Jan 16, 2018 at 4:38 PM, Jun Rao <j...@confluent.io>
>> > >> wrote:
>> > >> > > > >
>> > >> > > > >> Hi, Dong,
>> > >> > > > >>
>> > >> > > > >> Thanks for the updated KIP. Looks good to me overall. Just a
>> > few
>> > >> > minor
>> > >> > > > >> comments.
>> > >> > > > >>
>> > >> > > > >> 60. OffsetAndMetadata positionAndOffsetEpoch(TopicPartition
>> > >> > > partition):
>> > >> > > > >> It
>> > >> > > > >> seems that there is no need to return metadata. We probably
>> > want
>> > >> to
>> > >> > > > return
>> > >> > > > >> sth like OffsetAndEpoch.
>> > >> > > > >>
>> > >> > > > >
>> > >> > > > > Previously I think we may want to re-use the existing class
>> to
>> > >> keep
>> > >> > our
>> > >> > > > > consumer interface simpler. I have updated the KIP to add
>> class
>> > >> > > > > OffsetAndOffsetEpoch. I didn't use OffsetAndEpoch because
>> user
>> > may
>> > >> > > > confuse
>> > >> > > > > this name with OffsetEpoch. Does this sound OK?
>> > >> > > > >
>> > >> > > > >
>> > >> > > > >>
>> > >> > > > >> 61. Should we store partition_epoch in
>> > >> > > > >> /brokers/topics/[topic]/partitions/[partitionId] in ZK?
>> > >> > > > >>
>> > >> > > > >
>> > >> > > > > I have considered this. I think the advantage of adding the
>> > >> > > > > partition->partition_epoch map in the existing
>> > >> > > > > znode /brokers/topics/[topic]/partitions is that controller
>> > only
>> > >> > needs
>> > >> > > > to
>> > >> > > > > read one znode per topic to gets its partition_epoch
>> > information.
>> > >> > > > Otherwise
>> > >> > > > > controller may need to read one extra znode per partition to
>> get
>> > >> the
>> > >> > > same
>> > >> > > > > information.
>> > >> > > > >
>> > >> > > > > When we delete partition or expand partition of a topic,
>> someone
>> > >> > needs
>> > >> > > to
>> > >> > > > > modify partition->partition_epoch map in znode
>> > >> > > > > /brokers/topics/[topic]/partitions. This may seem a bit more
>> > >> > > complicated
>> > >> > > > > than simply adding or deleting znode /brokers/topics/[topic]/
>> > >> > > > partitions/[partitionId].
>> > >> > > > > But the complexity is probably similar to the existing
>> operation
>> > >> of
>> > >> > > > > modifying the partition->replica_list mapping in znode
>> > >> > > > > /brokers/topics/[topic]. So not sure it is better to store
>> the
>> > >> > > > > partition_epoch in /brokers/topics/[topic]/partit
>> > >> ions/[partitionId].
>> > >> > > > What
>> > >> > > > > do you think?
>> > >> > > > >
>> > >> > > > >
>> > >> > > > >>
>> > >> > > > >> 62. For checking outdated metadata in the client, we
>> probably
>> > >> want
>> > >> > to
>> > >> > > > add
>> > >> > > > >> when max_partition_epoch will be used.
>> > >> > > > >>
>> > >> > > > >
>> > >> > > > > The max_partition_epoch is used in the Proposed Changes ->
>> > >> Client's
>> > >> > > > > metadata refresh section to determine whether a metadata is
>> > >> outdated.
>> > >> > > And
>> > >> > > > > this formula is referenced and re-used in other sections to
>> > >> determine
>> > >> > > > > whether a metadata is outdated. Does this formula look OK?
>> > >> > > > >
>> > >> > > > >
>> > >> > > > >>
>> > >> > > > >> 63. "The leader_epoch should be the largest leader_epoch of
>> > >> messages
>> > >> > > > whose
>> > >> > > > >> offset < the commit offset. If no message has been consumed
>> > since
>> > >> > > > consumer
>> > >> > > > >> initialization, the leader_epoch from seek(...) or
>> > >> > OffsetFetchResponse
>> > >> > > > >> should be used. The partition_epoch should be read from the
>> > last
>> > >> > > > >> FetchResponse corresponding to the given partition and
>> commit
>> > >> > offset.
>> > >> > > ":
>> > >> > > > >> leader_epoch and partition_epoch are associated with an
>> offset.
>> > >> So,
>> > >> > if
>> > >> > > > no
>> > >> > > > >> message is consumed, there is no offset and therefore there
>> is
>> > no
>> > >> > need
>> > >> > > > to
>> > >> > > > >> read leader_epoch and partition_epoch. Also, the
>> leader_epoch
>> > >> > > associated
>> > >> > > > >> with the offset should just come from the messages returned
>> in
>> > >> the
>> > >> > > fetch
>> > >> > > > >> response.
>> > >> > > > >>
>> > >> > > > >
>> > >> > > > > I am thinking that, if user calls seek(..) and
>> commitSync(...)
>> > >> > without
>> > >> > > > > consuming any messages, we should re-use the leader_epoch and
>> > >> > > > > partition_epoch provided by the seek(...) in the
>> > >> OffsetCommitRequest.
>> > >> > > And
>> > >> > > > > if messages have been successfully consumed, then
>> leader_epoch
>> > >> will
>> > >> > > come
>> > >> > > > > from the messages returned in the fetch response. The
>> condition
>> > >> > > "messages
>> > >> > > > > whose offset < the commit offset" is needed to take care of
>> the
>> > >> log
>> > >> > > > > compacted topic which may have offset gap due to log
>> cleaning.
>> > >> > > > >
>> > >> > > > > Did I miss something here? Or should I rephrase the
>> paragraph to
>> > >> make
>> > >> > > it
>> > >> > > > > less confusing?
>> > >> > > > >
>> > >> > > > >
>> > >> > > > >> 64. Could you include the public methods in the OffsetEpoch
>> > >> class?
>> > >> > > > >>
>> > >> > > > >
>> > >> > > > > I mistakenly deleted the definition of OffsetEpoch class from
>> > the
>> > >> > KIP.
>> > >> > > I
>> > >> > > > > just added it back with the public methods. Could you take
>> > another
>> > >> > > look?
>> > >> > > > >
>> > >> > > > >
>> > >> > > > >>
>> > >> > > > >> Jun
>> > >> > > > >>
>> > >> > > > >>
>> > >> > > > >> On Thu, Jan 11, 2018 at 5:43 PM, Dong Lin <
>> lindon...@gmail.com
>> > >
>> > >> > > wrote:
>> > >> > > > >>
>> > >> > > > >> > Hey Jun,
>> > >> > > > >> >
>> > >> > > > >> > Thanks much. I agree that we can not rely on committed
>> > offsets
>> > >> to
>> > >> > be
>> > >> > > > >> always
>> > >> > > > >> > deleted when we delete topic. So it is necessary to use a
>> > >> > > > per-partition
>> > >> > > > >> > epoch that does not change unless this partition is
>> deleted.
>> > I
>> > >> > also
>> > >> > > > >> agree
>> > >> > > > >> > that it is very nice to be able to uniquely identify a
>> > message
>> > >> > with
>> > >> > > > >> > (offset, leader_epoch, partition_epoch) in face of
>> potential
>> > >> topic
>> > >> > > > >> deletion
>> > >> > > > >> > and unclean leader election.
>> > >> > > > >> >
>> > >> > > > >> > I agree with all your comments. And I have updated the KIP
>> > >> based
>> > >> > on
>> > >> > > > our
>> > >> > > > >> > latest discussion. In addition, I added
>> > >> > > InvalidPartitionEpochException
>> > >> > > > >> > which will be thrown by consumer.poll() if the
>> > partition_epoch
>> > >> > > > >> associated
>> > >> > > > >> > with the partition, which can be given to consumer using
>> > >> > seek(...),
>> > >> > > is
>> > >> > > > >> > different from the partition_epoch in the FetchResponse.
>> > >> > > > >> >
>> > >> > > > >> > Can you take another look at the latest KIP?
>> > >> > > > >> >
>> > >> > > > >> > Thanks!
>> > >> > > > >> > Dong
>> > >> > > > >> >
>> > >> > > > >> >
>> > >> > > > >> >
>> > >> > > > >> > On Wed, Jan 10, 2018 at 2:24 PM, Jun Rao <
>> j...@confluent.io>
>> > >> > wrote:
>> > >> > > > >> >
>> > >> > > > >> > > Hi, Dong,
>> > >> > > > >> > >
>> > >> > > > >> > > My replies are the following.
>> > >> > > > >> > >
>> > >> > > > >> > > 60. What you described could also work. The drawback is
>> > that
>> > >> we
>> > >> > > will
>> > >> > > > >> be
>> > >> > > > >> > > unnecessarily changing the partition epoch when a
>> partition
>> > >> > hasn't
>> > >> > > > >> really
>> > >> > > > >> > > changed. I was imagining that the partition epoch will
>> be
>> > >> stored
>> > >> > > in
>> > >> > > > >> > > /brokers/topics/[topic]/partitions/[partitionId],
>> instead
>> > >> of at
>> > >> > > the
>> > >> > > > >> > topic
>> > >> > > > >> > > level. So, not sure if ZK size limit is an issue.
>> > >> > > > >> > >
>> > >> > > > >> > > 61, 62 and 65. To me, the offset + offset_epoch is a
>> unique
>> > >> > > > identifier
>> > >> > > > >> > for
>> > >> > > > >> > > a message. So, if a message hasn't changed, the offset
>> and
>> > >> the
>> > >> > > > >> associated
>> > >> > > > >> > > offset_epoch ideally should remain the same (it will be
>> > kind
>> > >> of
>> > >> > > > weird
>> > >> > > > >> if
>> > >> > > > >> > > two consumer apps save the offset on the same message,
>> but
>> > >> the
>> > >> > > > >> > offset_epoch
>> > >> > > > >> > > are different). partition_epoch + leader_epoch give us
>> > that.
>> > >> > > > >> > global_epoch +
>> > >> > > > >> > > leader_epoch don't. If we use this approach, we can
>> solve
>> > not
>> > >> > only
>> > >> > > > the
>> > >> > > > >> > > problem that you have identified, but also other
>> problems
>> > >> when
>> > >> > > there
>> > >> > > > >> is
>> > >> > > > >> > > data loss or topic re-creation more reliably. For
>> example,
>> > in
>> > >> > the
>> > >> > > > >> future,
>> > >> > > > >> > > if we include the partition_epoch and leader_epoch in
>> the
>> > >> fetch
>> > >> > > > >> request,
>> > >> > > > >> > > the server can do a more reliable check of whether that
>> > >> offset
>> > >> > is
>> > >> > > > >> valid
>> > >> > > > >> > or
>> > >> > > > >> > > not. I am not sure that we can rely upon all external
>> > >> offsets to
>> > >> > > be
>> > >> > > > >> > removed
>> > >> > > > >> > > on topic deletion. For example, a topic may be deleted
>> by
>> > an
>> > >> > admin
>> > >> > > > who
>> > >> > > > >> > may
>> > >> > > > >> > > not know all the applications.
>> > >> > > > >> > >
>> > >> > > > >> > > If we agree on the above, the second question is then
>> how
>> > to
>> > >> > > > reliably
>> > >> > > > >> > > propagate the partition_epoch and the leader_epoch to
>> the
>> > >> > consumer
>> > >> > > > >> when
>> > >> > > > >> > > there are leader or partition changes. The leader_epoch
>> > comes
>> > >> > from
>> > >> > > > the
>> > >> > > > >> > > message, which is reliable. So, I was suggesting that
>> when
>> > we
>> > >> > > store
>> > >> > > > an
>> > >> > > > >> > > offset, we can just store the leader_epoch from the
>> message
>> > >> set
>> > >> > > > >> > containing
>> > >> > > > >> > > that offset. Similarly, I was thinking that if the
>> > >> > partition_epoch
>> > >> > > > is
>> > >> > > > >> in
>> > >> > > > >> > > the fetch response, we can propagate partition_epoch
>> > reliably
>> > >> > > where
>> > >> > > > is
>> > >> > > > >> > > partition_epoch change.
>> > >> > > > >> > >
>> > >> > > > >> > > 63. My point is that once a leader is producing a
>> message
>> > in
>> > >> the
>> > >> > > new
>> > >> > > > >> > > partition_epoch, ideally, we should associate the new
>> > offsets
>> > >> > with
>> > >> > > > the
>> > >> > > > >> > new
>> > >> > > > >> > > partition_epoch. Otherwise, the offset_epoch won't be
>> the
>> > >> > correct
>> > >> > > > >> unique
>> > >> > > > >> > > identifier (useful for solving other problems mentioned
>> > >> above).
>> > >> > I
>> > >> > > > was
>> > >> > > > >> > > originally thinking that the leader will include the
>> > >> > > partition_epoch
>> > >> > > > >> in
>> > >> > > > >> > the
>> > >> > > > >> > > metadata cache in the fetch response. It's just that
>> right
>> > >> now,
>> > >> > > > >> metadata
>> > >> > > > >> > > cache is updated on UpdateMetadataRequest, which
>> typically
>> > >> > happens
>> > >> > > > >> after
>> > >> > > > >> > > the LeaderAndIsrRequest. Another approach is for the
>> leader
>> > >> to
>> > >> > > cache
>> > >> > > > >> the
>> > >> > > > >> > > partition_epoch in the Partition object and return that
>> > >> (instead
>> > >> > > of
>> > >> > > > >> the
>> > >> > > > >> > one
>> > >> > > > >> > > in metadata cache) in the fetch response.
>> > >> > > > >> > >
>> > >> > > > >> > > 65. It seems to me that the global_epoch and the
>> > >> partition_epoch
>> > >> > > > have
>> > >> > > > >> > > different purposes. A partition_epoch has the benefit
>> that
>> > it
>> > >> > (1)
>> > >> > > > can
>> > >> > > > >> be
>> > >> > > > >> > > used to form a unique identifier for a message and (2)
>> can
>> > be
>> > >> > used
>> > >> > > > to
>> > >> > > > >> > > solve other
>> > >> > > > >> > > corner case problems in the future. I am not sure having
>> > >> just a
>> > >> > > > >> > > global_epoch can achieve these. global_epoch is useful
>> to
>> > >> > > determine
>> > >> > > > >> which
>> > >> > > > >> > > version of the metadata is newer, especially with topic
>> > >> > deletion.
>> > >> > > > >> > >
>> > >> > > > >> > > Thanks,
>> > >> > > > >> > >
>> > >> > > > >> > > Jun
>> > >> > > > >> > >
>> > >> > > > >> > > On Tue, Jan 9, 2018 at 11:34 PM, Dong Lin <
>> > >> lindon...@gmail.com>
>> > >> > > > >> wrote:
>> > >> > > > >> > >
>> > >> > > > >> > > > Regarding the use of the global epoch in 65), it is
>> very
>> > >> > similar
>> > >> > > > to
>> > >> > > > >> the
>> > >> > > > >> > > > proposal of the metadata_epoch we discussed earlier.
>> The
>> > >> main
>> > >> > > > >> > difference
>> > >> > > > >> > > is
>> > >> > > > >> > > > that this epoch is incremented when we
>> > create/expand/delete
>> > >> > > topic
>> > >> > > > >> and
>> > >> > > > >> > > does
>> > >> > > > >> > > > not change when controller re-send metadata.
>> > >> > > > >> > > >
>> > >> > > > >> > > > I looked at our previous discussion. It seems that we
>> > >> prefer
>> > >> > > > >> > > > partition_epoch over the metadata_epoch because 1) we
>> > >> prefer
>> > >> > not
>> > >> > > > to
>> > >> > > > >> > have
>> > >> > > > >> > > an
>> > >> > > > >> > > > ever growing metadata_epoch and 2) we can reset offset
>> > >> better
>> > >> > > when
>> > >> > > > >> > topic
>> > >> > > > >> > > is
>> > >> > > > >> > > > re-created. The use of global topic_epoch avoids the
>> > >> drawback
>> > >> > of
>> > >> > > > an
>> > >> > > > >> > ever
>> > >> > > > >> > > > quickly ever growing metadata_epoch. Though the global
>> > >> epoch
>> > >> > > does
>> > >> > > > >> not
>> > >> > > > >> > > allow
>> > >> > > > >> > > > us to recognize the invalid offset committed before
>> the
>> > >> topic
>> > >> > > > >> > > re-creation,
>> > >> > > > >> > > > we can probably just delete the offset when we delete
>> a
>> > >> topic.
>> > >> > > > Thus
>> > >> > > > >> I
>> > >> > > > >> > am
>> > >> > > > >> > > > not very sure whether it is still worthwhile to have a
>> > >> > > > per-partition
>> > >> > > > >> > > > partition_epoch if the metadata already has the global
>> > >> epoch.
>> > >> > > > >> > > >
>> > >> > > > >> > > >
>> > >> > > > >> > > > On Tue, Jan 9, 2018 at 6:58 PM, Dong Lin <
>> > >> lindon...@gmail.com
>> > >> > >
>> > >> > > > >> wrote:
>> > >> > > > >> > > >
>> > >> > > > >> > > > > Hey Jun,
>> > >> > > > >> > > > >
>> > >> > > > >> > > > > Thanks so much. These comments very useful. Please
>> see
>> > >> below
>> > >> > > my
>> > >> > > > >> > > comments.
>> > >> > > > >> > > > >
>> > >> > > > >> > > > > On Mon, Jan 8, 2018 at 5:52 PM, Jun Rao <
>> > >> j...@confluent.io>
>> > >> > > > wrote:
>> > >> > > > >> > > > >
>> > >> > > > >> > > > >> Hi, Dong,
>> > >> > > > >> > > > >>
>> > >> > > > >> > > > >> Thanks for the updated KIP. A few more comments.
>> > >> > > > >> > > > >>
>> > >> > > > >> > > > >> 60. Perhaps having a partition epoch is more
>> flexible
>> > >> since
>> > >> > > in
>> > >> > > > >> the
>> > >> > > > >> > > > future,
>> > >> > > > >> > > > >> we may support deleting a partition as well.
>> > >> > > > >> > > > >>
>> > >> > > > >> > > > >
>> > >> > > > >> > > > > Yeah I have considered this. I think we can probably
>> > >> still
>> > >> > > > support
>> > >> > > > >> > > > > deleting a partition by using the topic_epoch --
>> when
>> > >> > > partition
>> > >> > > > >> of a
>> > >> > > > >> > > > topic
>> > >> > > > >> > > > > is deleted or created, epoch of all partitions of
>> this
>> > >> topic
>> > >> > > > will
>> > >> > > > >> be
>> > >> > > > >> > > > > incremented by 1. Therefore, if that partition is
>> > >> re-created
>> > >> > > > >> later,
>> > >> > > > >> > the
>> > >> > > > >> > > > > epoch of that partition will still be larger than
>> its
>> > >> epoch
>> > >> > > > before
>> > >> > > > >> > the
>> > >> > > > >> > > > > deletion, which still allows the client to order the
>> > >> > metadata
>> > >> > > > for
>> > >> > > > >> the
>> > >> > > > >> > > > > purpose of this KIP. Does this sound reasonable?
>> > >> > > > >> > > > >
>> > >> > > > >> > > > > The advantage of using topic_epoch instead of
>> > >> > partition_epoch
>> > >> > > is
>> > >> > > > >> that
>> > >> > > > >> > > the
>> > >> > > > >> > > > > size of the /brokers/topics/[topic] znode and
>> > >> > request/response
>> > >> > > > >> size
>> > >> > > > >> > can
>> > >> > > > >> > > > be
>> > >> > > > >> > > > > smaller. We have a limit on the maximum size of
>> znode
>> > >> > > (typically
>> > >> > > > >> > 1MB).
>> > >> > > > >> > > > Use
>> > >> > > > >> > > > > partition epoch can effectively reduce the number of
>> > >> > > partitions
>> > >> > > > >> that
>> > >> > > > >> > > can
>> > >> > > > >> > > > be
>> > >> > > > >> > > > > described by the /brokers/topics/[topic] znode.
>> > >> > > > >> > > > >
>> > >> > > > >> > > > > One use-case of partition_epoch for client to detect
>> > that
>> > >> > the
>> > >> > > > >> > committed
>> > >> > > > >> > > > > offset, either from kafka offset topic or from the
>> > >> external
>> > >> > > > store
>> > >> > > > >> is
>> > >> > > > >> > > > > invalid after partition deletion and re-creation.
>> > >> However,
>> > >> > it
>> > >> > > > >> seems
>> > >> > > > >> > > that
>> > >> > > > >> > > > we
>> > >> > > > >> > > > > can also address this use-case with other
>> approaches.
>> > For
>> > >> > > > example,
>> > >> > > > >> > when
>> > >> > > > >> > > > > AdminClient deletes partitions, it can also delete
>> the
>> > >> > > committed
>> > >> > > > >> > > offsets
>> > >> > > > >> > > > > for those partitions from the offset topic. If user
>> > >> stores
>> > >> > > > offset
>> > >> > > > >> > > > > externally, it might make sense for user to
>> similarly
>> > >> remove
>> > >> > > > >> offsets
>> > >> > > > >> > of
>> > >> > > > >> > > > > related partitions after these partitions are
>> deleted.
>> > >> So I
>> > >> > am
>> > >> > > > not
>> > >> > > > >> > sure
>> > >> > > > >> > > > > that we should use partition_epoch in this KIP.
>> > >> > > > >> > > > >
>> > >> > > > >> > > > >
>> > >> > > > >> > > > >>
>> > >> > > > >> > > > >> 61. It seems that the leader epoch returned in the
>> > >> > position()
>> > >> > > > >> call
>> > >> > > > >> > > > should
>> > >> > > > >> > > > >> the the leader epoch returned in the fetch
>> response,
>> > not
>> > >> > the
>> > >> > > > one
>> > >> > > > >> in
>> > >> > > > >> > > the
>> > >> > > > >> > > > >> metadata cache of the client.
>> > >> > > > >> > > > >
>> > >> > > > >> > > > >
>> > >> > > > >> > > > > I think this is a good idea. Just to double check,
>> this
>> > >> > change
>> > >> > > > >> does
>> > >> > > > >> > not
>> > >> > > > >> > > > > affect the correctness or performance of this KIP.
>> But
>> > it
>> > >> > can
>> > >> > > be
>> > >> > > > >> > useful
>> > >> > > > >> > > > if
>> > >> > > > >> > > > > we want to use the leader_epoch to better handle the
>> > >> offset
>> > >> > > rest
>> > >> > > > >> in
>> > >> > > > >> > > case
>> > >> > > > >> > > > of
>> > >> > > > >> > > > > unclean leader election, which is listed in the
>> future
>> > >> work.
>> > >> > > Is
>> > >> > > > >> this
>> > >> > > > >> > > > > understanding correct?
>> > >> > > > >> > > > >
>> > >> > > > >> > > > > I have updated the KIP to specify that the
>> leader_epoch
>> > >> > > returned
>> > >> > > > >> by
>> > >> > > > >> > > > > position() should be the largest leader_epoch of
>> those
>> > >> > already
>> > >> > > > >> > consumed
>> > >> > > > >> > > > > messages whose offset < position. If no message has
>> > been
>> > >> > > > consumed
>> > >> > > > >> > since
>> > >> > > > >> > > > > consumer initialization, the leader_epoch from
>> seek()
>> > or
>> > >> > > > >> > > > > OffsetFetchResponse should be used. The offset
>> included
>> > >> in
>> > >> > the
>> > >> > > > >> > > > > OffsetCommitRequest will also be determined in the
>> > >> similar
>> > >> > > > manner.
>> > >> > > > >> > > > >
>> > >> > > > >> > > > >
>> > >> > > > >> > > > >>
>> > >> > > > >> > > > >> 62. I am wondering if we should return the
>> partition
>> > >> epoch
>> > >> > in
>> > >> > > > the
>> > >> > > > >> > > fetch
>> > >> > > > >> > > > >> response as well. In the current proposal, if a
>> topic
>> > is
>> > >> > > > >> recreated
>> > >> > > > >> > and
>> > >> > > > >> > > > the
>> > >> > > > >> > > > >> new leader is on the same broker as the old one,
>> there
>> > >> is
>> > >> > > > >> nothing to
>> > >> > > > >> > > > force
>> > >> > > > >> > > > >> the metadata refresh in the client. So, the client
>> may
>> > >> > still
>> > >> > > > >> > associate
>> > >> > > > >> > > > the
>> > >> > > > >> > > > >> offset with the old partition epoch.
>> > >> > > > >> > > > >>
>> > >> > > > >> > > > >
>> > >> > > > >> > > > > Could you help me understand the problem if a client
>> > >> > > associates
>> > >> > > > >> old
>> > >> > > > >> > > > > partition_epoch (or the topic_epoch as of the
>> current
>> > >> KIP)
>> > >> > > with
>> > >> > > > >> the
>> > >> > > > >> > > > offset?
>> > >> > > > >> > > > > The main purpose of the topic_epoch is to be able to
>> > drop
>> > >> > > > >> > leader_epoch
>> > >> > > > >> > > > to 0
>> > >> > > > >> > > > > after a partition is deleted and re-created. I guess
>> > you
>> > >> may
>> > >> > > be
>> > >> > > > >> > > thinking
>> > >> > > > >> > > > > about using the partition_epoch to detect that the
>> > >> committed
>> > >> > > > >> offset
>> > >> > > > >> > is
>> > >> > > > >> > > > > invalid? In that case, I am wondering if the
>> > alternative
>> > >> > > > approach
>> > >> > > > >> > > > described
>> > >> > > > >> > > > > in 60) would be reasonable.
>> > >> > > > >> > > > >
>> > >> > > > >> > > > >
>> > >> > > > >> > > > >>
>> > >> > > > >> > > > >> 63. There is some subtle coordination between the
>> > >> > > > >> > LeaderAndIsrRequest
>> > >> > > > >> > > > and
>> > >> > > > >> > > > >> UpdateMetadataRequest. Currently, when a leader
>> > changes,
>> > >> > the
>> > >> > > > >> > > controller
>> > >> > > > >> > > > >> first sends the LeaderAndIsrRequest to the assigned
>> > >> > replicas
>> > >> > > > and
>> > >> > > > >> the
>> > >> > > > >> > > > >> UpdateMetadataRequest to every broker. So, there
>> could
>> > >> be a
>> > >> > > > small
>> > >> > > > >> > > window
>> > >> > > > >> > > > >> when the leader already receives the new partition
>> > >> epoch in
>> > >> > > the
>> > >> > > > >> > > > >> LeaderAndIsrRequest, but the metadata cache in the
>> > >> broker
>> > >> > > > hasn't
>> > >> > > > >> > been
>> > >> > > > >> > > > >> updated with the latest partition epoch. Not sure
>> > what's
>> > >> > the
>> > >> > > > best
>> > >> > > > >> > way
>> > >> > > > >> > > to
>> > >> > > > >> > > > >> address this issue. Perhaps we can update the
>> metadata
>> > >> > cache
>> > >> > > on
>> > >> > > > >> the
>> > >> > > > >> > > > broker
>> > >> > > > >> > > > >> with both LeaderAndIsrRequest and
>> > UpdateMetadataRequest.
>> > >> > The
>> > >> > > > >> > challenge
>> > >> > > > >> > > > is
>> > >> > > > >> > > > >> that the two have slightly different data. For
>> > example,
>> > >> > only
>> > >> > > > the
>> > >> > > > >> > > latter
>> > >> > > > >> > > > >> has
>> > >> > > > >> > > > >> all endpoints.
>> > >> > > > >> > > > >>
>> > >> > > > >> > > > >
>> > >> > > > >> > > > > I am not sure whether this is a problem. Could you
>> > >> explain a
>> > >> > > bit
>> > >> > > > >> more
>> > >> > > > >> > > > what
>> > >> > > > >> > > > > specific problem this small window can cause?
>> > >> > > > >> > > > >
>> > >> > > > >> > > > > Since client can fetch metadata from any broker in
>> the
>> > >> > > cluster,
>> > >> > > > >> and
>> > >> > > > >> > > given
>> > >> > > > >> > > > > that different brokers receive request (e.g.
>> > >> > > LeaderAndIsrRequest
>> > >> > > > >> and
>> > >> > > > >> > > > > UpdateMetadataRequest) in arbitrary order, the
>> metadata
>> > >> > > received
>> > >> > > > >> by
>> > >> > > > >> > > > client
>> > >> > > > >> > > > > can be in arbitrary order (either newer or older)
>> > >> compared
>> > >> > to
>> > >> > > > the
>> > >> > > > >> > > > broker's
>> > >> > > > >> > > > > leadership state even if a given broker receives
>> > >> > > > >> LeaderAndIsrRequest
>> > >> > > > >> > > and
>> > >> > > > >> > > > > UpdateMetadataRequest simultaneously. So I am not
>> sure
>> > >> it is
>> > >> > > > >> useful
>> > >> > > > >> > to
>> > >> > > > >> > > > > update broker's cache with LeaderAndIsrRequest.
>> > >> > > > >> > > > >
>> > >> > > > >> > > > >
>> > >> > > > >> > > > >> 64. The enforcement of leader epoch in Offset
>> commit:
>> > We
>> > >> > > allow
>> > >> > > > a
>> > >> > > > >> > > > consumer
>> > >> > > > >> > > > >> to set an arbitrary offset. So it's possible for
>> > >> offsets or
>> > >> > > > >> leader
>> > >> > > > >> > > epoch
>> > >> > > > >> > > > >> to
>> > >> > > > >> > > > >> go backwards. I am not sure if we could always
>> enforce
>> > >> that
>> > >> > > the
>> > >> > > > >> > leader
>> > >> > > > >> > > > >> epoch only goes up on the broker.
>> > >> > > > >> > > > >>
>> > >> > > > >> > > > >
>> > >> > > > >> > > > > Sure. I have removed this check from the KIP.
>> > >> > > > >> > > > >
>> > >> > > > >> > > > > BTW, we can probably still ensure that the
>> leader_epoch
>> > >> > always
>> > >> > > > >> > increase
>> > >> > > > >> > > > if
>> > >> > > > >> > > > > the leader_epoch used with offset commit is the
>> > >> > > max(leader_epoch
>> > >> > > > >> of
>> > >> > > > >> > the
>> > >> > > > >> > > > > message with offset = the committed offset - 1, the
>> > >> largest
>> > >> > > > known
>> > >> > > > >> > > > > leader_epoch from the metadata). But I don't have a
>> > good
>> > >> > > > use-case
>> > >> > > > >> for
>> > >> > > > >> > > > this
>> > >> > > > >> > > > > alternative definition. So I choose the keep the KIP
>> > >> simple
>> > >> > by
>> > >> > > > >> > > requiring
>> > >> > > > >> > > > > leader_epoch to always increase.
>> > >> > > > >> > > > >
>> > >> > > > >> > > > >
>> > >> > > > >> > > > >> 65. Good point on handling missing partition epoch
>> due
>> > >> to
>> > >> > > topic
>> > >> > > > >> > > > deletion.
>> > >> > > > >> > > > >> Another potential way to address this is to
>> > additionally
>> > >> > > > >> propagate
>> > >> > > > >> > the
>> > >> > > > >> > > > >> global partition epoch to brokers and the clients.
>> > This
>> > >> > way,
>> > >> > > > >> when a
>> > >> > > > >> > > > >> partition epoch is missing, we can use the global
>> > >> partition
>> > >> > > > >> epoch to
>> > >> > > > >> > > > >> reason
>> > >> > > > >> > > > >> about which metadata is more recent.
>> > >> > > > >> > > > >>
>> > >> > > > >> > > > >
>> > >> > > > >> > > > > This is a great idea. The global epoch can be used
>> to
>> > >> order
>> > >> > > the
>> > >> > > > >> > > metadata
>> > >> > > > >> > > > > and help us recognize the more recent metadata if a
>> > topic
>> > >> > (or
>> > >> > > > >> > > partition)
>> > >> > > > >> > > > is
>> > >> > > > >> > > > > deleted and re-created.
>> > >> > > > >> > > > >
>> > >> > > > >> > > > > Actually, it seems we only need to propagate the
>> global
>> > >> > epoch
>> > >> > > to
>> > >> > > > >> > > brokers
>> > >> > > > >> > > > > and clients without propagating this epoch on a
>> > >> per-topic or
>> > >> > > > >> > > > per-partition
>> > >> > > > >> > > > > basic. Doing so would simply interface changes made
>> > this
>> > >> > KIP.
>> > >> > > > Does
>> > >> > > > >> > this
>> > >> > > > >> > > > > approach sound reasonable?
>> > >> > > > >> > > > >
>> > >> > > > >> > > > >
>> > >> > > > >> > > > >> 66. A client may also get an offset by time using
>> the
>> > >> > > > >> > offsetForTimes()
>> > >> > > > >> > > > >> api.
>> > >> > > > >> > > > >> So, we probably want to include
>> offsetInternalMetadata
>> > >> in
>> > >> > > > >> > > > >> OffsetAndTimestamp
>> > >> > > > >> > > > >> as well.
>> > >> > > > >> > > > >>
>> > >> > > > >> > > > >
>> > >> > > > >> > > > > You are right. This probably also requires us to
>> change
>> > >> the
>> > >> > > > >> > > > > ListOffsetRequest as well. I will update the KIP
>> after
>> > we
>> > >> > > agree
>> > >> > > > on
>> > >> > > > >> > the
>> > >> > > > >> > > > > solution for 65).
>> > >> > > > >> > > > >
>> > >> > > > >> > > > >
>> > >> > > > >> > > > >>
>> > >> > > > >> > > > >> 67. InteralMetadata can be a bit confusing with the
>> > >> > metadata
>> > >> > > > >> field
>> > >> > > > >> > > > already
>> > >> > > > >> > > > >> there. Perhaps we can just call it OffsetEpoch. It
>> > >> might be
>> > >> > > > >> useful
>> > >> > > > >> > to
>> > >> > > > >> > > > make
>> > >> > > > >> > > > >> OffsetEpoch printable at least for debugging
>> purpose.
>> > >> Once
>> > >> > > you
>> > >> > > > do
>> > >> > > > >> > > that,
>> > >> > > > >> > > > we
>> > >> > > > >> > > > >> are already exposing the internal fields. So, not
>> sure
>> > >> if
>> > >> > > it's
>> > >> > > > >> worth
>> > >> > > > >> > > > >> hiding
>> > >> > > > >> > > > >> them. If we do want to hide them, perhaps we can
>> have
>> > >> sth
>> > >> > > like
>> > >> > > > >> the
>> > >> > > > >> > > > >> following. The binary encoding is probably more
>> > >> efficient
>> > >> > > than
>> > >> > > > >> JSON
>> > >> > > > >> > > for
>> > >> > > > >> > > > >> external storage.
>> > >> > > > >> > > > >>
>> > >> > > > >> > > > >> OffsetEpoch {
>> > >> > > > >> > > > >>  static OffsetEpoch decode(byte[]);
>> > >> > > > >> > > > >>
>> > >> > > > >> > > > >>   public byte[] encode();
>> > >> > > > >> > > > >>
>> > >> > > > >> > > > >>   public String toString();
>> > >> > > > >> > > > >> }
>> > >> > > > >> > > > >>
>> > >> > > > >> > > > >
>> > >> > > > >> > > > > Thanks much. I like this solution. I have updated
>> the
>> > KIP
>> > >> > > > >> > accordingly.
>> > >> > > > >> > > > >
>> > >> > > > >> > > > >
>> > >> > > > >> > > > >
>> > >> > > > >> > > > >>
>> > >> > > > >> > > > >> Jun
>> > >> > > > >> > > > >>
>> > >> > > > >> > > > >> On Mon, Jan 8, 2018 at 4:22 PM, Dong Lin <
>> > >> > > lindon...@gmail.com>
>> > >> > > > >> > wrote:
>> > >> > > > >> > > > >>
>> > >> > > > >> > > > >> > Hey Jason,
>> > >> > > > >> > > > >> >
>> > >> > > > >> > > > >> > Certainly. This sounds good. I have updated the
>> KIP
>> > to
>> > >> > > > clarity
>> > >> > > > >> > that
>> > >> > > > >> > > > the
>> > >> > > > >> > > > >> > global epoch will be incremented by 1 each time a
>> > >> topic
>> > >> > is
>> > >> > > > >> > deleted.
>> > >> > > > >> > > > >> >
>> > >> > > > >> > > > >> > Thanks,
>> > >> > > > >> > > > >> > Dong
>> > >> > > > >> > > > >> >
>> > >> > > > >> > > > >> > On Mon, Jan 8, 2018 at 4:09 PM, Jason Gustafson <
>> > >> > > > >> > ja...@confluent.io
>> > >> > > > >> > > >
>> > >> > > > >> > > > >> > wrote:
>> > >> > > > >> > > > >> >
>> > >> > > > >> > > > >> > > Hi Dong,
>> > >> > > > >> > > > >> > >
>> > >> > > > >> > > > >> > >
>> > >> > > > >> > > > >> > > I think your approach will allow user to
>> > distinguish
>> > >> > > > between
>> > >> > > > >> the
>> > >> > > > >> > > > >> metadata
>> > >> > > > >> > > > >> > > > before and after the topic deletion. I also
>> > agree
>> > >> > that
>> > >> > > > this
>> > >> > > > >> > can
>> > >> > > > >> > > be
>> > >> > > > >> > > > >> > > > potentially be useful to user. I am just not
>> > very
>> > >> > sure
>> > >> > > > >> whether
>> > >> > > > >> > > we
>> > >> > > > >> > > > >> > already
>> > >> > > > >> > > > >> > > > have a good use-case to make the additional
>> > >> > complexity
>> > >> > > > >> > > worthwhile.
>> > >> > > > >> > > > >> It
>> > >> > > > >> > > > >> > > seems
>> > >> > > > >> > > > >> > > > that this feature is kind of independent of
>> the
>> > >> main
>> > >> > > > >> problem
>> > >> > > > >> > of
>> > >> > > > >> > > > this
>> > >> > > > >> > > > >> > KIP.
>> > >> > > > >> > > > >> > > > Could we add this as a future work?
>> > >> > > > >> > > > >> > >
>> > >> > > > >> > > > >> > >
>> > >> > > > >> > > > >> > > Do you think it's fair if we bump the topic
>> epoch
>> > on
>> > >> > > > deletion
>> > >> > > > >> > and
>> > >> > > > >> > > > >> leave
>> > >> > > > >> > > > >> > > propagation of the epoch for deleted topics for
>> > >> future
>> > >> > > > work?
>> > >> > > > >> I
>> > >> > > > >> > > don't
>> > >> > > > >> > > > >> > think
>> > >> > > > >> > > > >> > > this adds much complexity and it makes the
>> > behavior
>> > >> > > > >> consistent:
>> > >> > > > >> > > > every
>> > >> > > > >> > > > >> > topic
>> > >> > > > >> > > > >> > > mutation results in an epoch bump.
>> > >> > > > >> > > > >> > >
>> > >> > > > >> > > > >> > > Thanks,
>> > >> > > > >> > > > >> > > Jason
>> > >> > > > >> > > > >> > >
>> > >> > > > >> > > > >> > > On Mon, Jan 8, 2018 at 3:14 PM, Dong Lin <
>> > >> > > > >> lindon...@gmail.com>
>> > >> > > > >> > > > wrote:
>> > >> > > > >> > > > >> > >
>> > >> > > > >> > > > >> > > > Hey Ismael,
>> > >> > > > >> > > > >> > > >
>> > >> > > > >> > > > >> > > > I guess we actually need user to see this
>> field
>> > so
>> > >> > that
>> > >> > > > >> user
>> > >> > > > >> > can
>> > >> > > > >> > > > >> store
>> > >> > > > >> > > > >> > > this
>> > >> > > > >> > > > >> > > > value in the external store together with the
>> > >> offset.
>> > >> > > We
>> > >> > > > >> just
>> > >> > > > >> > > > prefer
>> > >> > > > >> > > > >> > the
>> > >> > > > >> > > > >> > > > value to be opaque to discourage most users
>> from
>> > >> > > > >> interpreting
>> > >> > > > >> > > this
>> > >> > > > >> > > > >> > value.
>> > >> > > > >> > > > >> > > > One more advantage of using such an opaque
>> field
>> > >> is
>> > >> > to
>> > >> > > be
>> > >> > > > >> able
>> > >> > > > >> > > to
>> > >> > > > >> > > > >> > evolve
>> > >> > > > >> > > > >> > > > the information (or schema) of this value
>> > without
>> > >> > > > changing
>> > >> > > > >> > > > consumer
>> > >> > > > >> > > > >> API
>> > >> > > > >> > > > >> > > in
>> > >> > > > >> > > > >> > > > the future.
>> > >> > > > >> > > > >> > > >
>> > >> > > > >> > > > >> > > > I also thinking it is probably OK for user
>> to be
>> > >> able
>> > >> > > to
>> > >> > > > >> > > interpret
>> > >> > > > >> > > > >> this
>> > >> > > > >> > > > >> > > > value, particularly for those advanced users.
>> > >> > > > >> > > > >> > > >
>> > >> > > > >> > > > >> > > > Thanks,
>> > >> > > > >> > > > >> > > > Dong
>> > >> > > > >> > > > >> > > >
>> > >> > > > >> > > > >> > > > On Mon, Jan 8, 2018 at 2:34 PM, Ismael Juma <
>> > >> > > > >> > ism...@juma.me.uk>
>> > >> > > > >> > > > >> wrote:
>> > >> > > > >> > > > >> > > >
>> > >> > > > >> > > > >> > > > > On Fri, Jan 5, 2018 at 7:15 PM, Jason
>> > Gustafson
>> > >> <
>> > >> > > > >> > > > >> ja...@confluent.io>
>> > >> > > > >> > > > >> > > > > wrote:
>> > >> > > > >> > > > >> > > > > >
>> > >> > > > >> > > > >> > > > > > class OffsetAndMetadata {
>> > >> > > > >> > > > >> > > > > >   long offset;
>> > >> > > > >> > > > >> > > > > >   byte[] offsetMetadata;
>> > >> > > > >> > > > >> > > > > >   String metadata;
>> > >> > > > >> > > > >> > > > > > }
>> > >> > > > >> > > > >> > > > >
>> > >> > > > >> > > > >> > > > >
>> > >> > > > >> > > > >> > > > > > Admittedly, the naming is a bit annoying,
>> > but
>> > >> we
>> > >> > > can
>> > >> > > > >> > > probably
>> > >> > > > >> > > > >> come
>> > >> > > > >> > > > >> > up
>> > >> > > > >> > > > >> > > > > with
>> > >> > > > >> > > > >> > > > > > something better. Internally the byte
>> array
>> > >> would
>> > >> > > > have
>> > >> > > > >> a
>> > >> > > > >> > > > >> version.
>> > >> > > > >> > > > >> > If
>> > >> > > > >> > > > >> > > in
>> > >> > > > >> > > > >> > > > > the
>> > >> > > > >> > > > >> > > > > > future we have anything else we need to
>> add,
>> > >> we
>> > >> > can
>> > >> > > > >> update
>> > >> > > > >> > > the
>> > >> > > > >> > > > >> > > version
>> > >> > > > >> > > > >> > > > > and
>> > >> > > > >> > > > >> > > > > > we wouldn't need any new APIs.
>> > >> > > > >> > > > >> > > > > >
>> > >> > > > >> > > > >> > > > >
>> > >> > > > >> > > > >> > > > > We can also add fields to a class in a
>> > >> compatible
>> > >> > > way.
>> > >> > > > >> So,
>> > >> > > > >> > it
>> > >> > > > >> > > > >> seems
>> > >> > > > >> > > > >> > to
>> > >> > > > >> > > > >> > > me
>> > >> > > > >> > > > >> > > > > that the main advantage of the byte array
>> is
>> > >> that
>> > >> > > it's
>> > >> > > > >> > opaque
>> > >> > > > >> > > to
>> > >> > > > >> > > > >> the
>> > >> > > > >> > > > >> > > > user.
>> > >> > > > >> > > > >> > > > > Is that correct? If so, we could also add
>> any
>> > >> > opaque
>> > >> > > > >> > metadata
>> > >> > > > >> > > > in a
>> > >> > > > >> > > > >> > > > subclass
>> > >> > > > >> > > > >> > > > > so that users don't even see it (unless
>> they
>> > >> cast
>> > >> > it,
>> > >> > > > but
>> > >> > > > >> > then
>> > >> > > > >> > > > >> > they're
>> > >> > > > >> > > > >> > > on
>> > >> > > > >> > > > >> > > > > their own).
>> > >> > > > >> > > > >> > > > >
>> > >> > > > >> > > > >> > > > > Ismael
>> > >> > > > >> > > > >> > > > >
>> > >> > > > >> > > > >> > > > > The corresponding seek() and position()
>> APIs
>> > >> might
>> > >> > > look
>> > >> > > > >> > > > something
>> > >> > > > >> > > > >> > like
>> > >> > > > >> > > > >> > > > > this:
>> > >> > > > >> > > > >> > > > > >
>> > >> > > > >> > > > >> > > > > > void seek(TopicPartition partition, long
>> > >> offset,
>> > >> > > > byte[]
>> > >> > > > >> > > > >> > > > offsetMetadata);
>> > >> > > > >> > > > >> > > > > > byte[] positionMetadata(TopicPartition
>> > >> > partition);
>> > >> > > > >> > > > >> > > > > >
>> > >> > > > >> > > > >> > > > > > What do you think?
>> > >> > > > >> > > > >> > > > > >
>> > >> > > > >> > > > >> > > > > > Thanks,
>> > >> > > > >> > > > >> > > > > > Jason
>> > >> > > > >> > > > >> > > > > >
>> > >> > > > >> > > > >> > > > > > On Thu, Jan 4, 2018 at 7:04 PM, Dong Lin
>> <
>> > >> > > > >> > > lindon...@gmail.com
>> > >> > > > >> > > > >
>> > >> > > > >> > > > >> > > wrote:
>> > >> > > > >> > > > >> > > > > >
>> > >> > > > >> > > > >> > > > > > > Hey Jun, Jason,
>> > >> > > > >> > > > >> > > > > > >
>> > >> > > > >> > > > >> > > > > > > Thanks much for all the feedback. I
>> have
>> > >> > updated
>> > >> > > > the
>> > >> > > > >> KIP
>> > >> > > > >> > > > >> based on
>> > >> > > > >> > > > >> > > the
>> > >> > > > >> > > > >> > > > > > > latest discussion. Can you help check
>> > >> whether
>> > >> > it
>> > >> > > > >> looks
>> > >> > > > >> > > good?
>> > >> > > > >> > > > >> > > > > > >
>> > >> > > > >> > > > >> > > > > > > Thanks,
>> > >> > > > >> > > > >> > > > > > > Dong
>> > >> > > > >> > > > >> > > > > > >
>> > >> > > > >> > > > >> > > > > > > On Thu, Jan 4, 2018 at 5:36 PM, Dong
>> Lin <
>> > >> > > > >> > > > lindon...@gmail.com
>> > >> > > > >> > > > >> >
>> > >> > > > >> > > > >> > > > wrote:
>> > >> > > > >> > > > >> > > > > > >
>> > >> > > > >> > > > >> > > > > > > > Hey Jun,
>> > >> > > > >> > > > >> > > > > > > >
>> > >> > > > >> > > > >> > > > > > > > Hmm... thinking about this more, I am
>> > not
>> > >> > sure
>> > >> > > > that
>> > >> > > > >> > the
>> > >> > > > >> > > > >> > proposed
>> > >> > > > >> > > > >> > > > API
>> > >> > > > >> > > > >> > > > > is
>> > >> > > > >> > > > >> > > > > > > > sufficient. For users that store
>> offset
>> > >> > > > >> externally, we
>> > >> > > > >> > > > >> probably
>> > >> > > > >> > > > >> > > > need
>> > >> > > > >> > > > >> > > > > > > extra
>> > >> > > > >> > > > >> > > > > > > > API to return the leader_epoch and
>> > >> > > > partition_epoch
>> > >> > > > >> for
>> > >> > > > >> > > all
>> > >> > > > >> > > > >> > > > partitions
>> > >> > > > >> > > > >> > > > > > > that
>> > >> > > > >> > > > >> > > > > > > > consumers are consuming. I suppose
>> these
>> > >> > users
>> > >> > > > >> > currently
>> > >> > > > >> > > > use
>> > >> > > > >> > > > >> > > > > position()
>> > >> > > > >> > > > >> > > > > > > to
>> > >> > > > >> > > > >> > > > > > > > get the offset. Thus we probably
>> need a
>> > >> new
>> > >> > > > method
>> > >> > > > >> > > > >> > > > > > positionWithEpoch(..)
>> > >> > > > >> > > > >> > > > > > > to
>> > >> > > > >> > > > >> > > > > > > > return <offset, partition_epoch,
>> > >> > leader_epoch>.
>> > >> > > > >> Does
>> > >> > > > >> > > this
>> > >> > > > >> > > > >> sound
>> > >> > > > >> > > > >> > > > > > > reasonable?
>> > >> > > > >> > > > >> > > > > > > >
>> > >> > > > >> > > > >> > > > > > > > Thanks,
>> > >> > > > >> > > > >> > > > > > > > Dong
>> > >> > > > >> > > > >> > > > > > > >
>> > >> > > > >> > > > >> > > > > > > >
>> > >> > > > >> > > > >> > > > > > > > On Thu, Jan 4, 2018 at 5:26 PM, Jun
>> Rao
>> > <
>> > >> > > > >> > > j...@confluent.io
>> > >> > > > >> > > > >
>> > >> > > > >> > > > >> > > wrote:
>> > >> > > > >> > > > >> > > > > > > >
>> > >> > > > >> > > > >> > > > > > > >> Hi, Dong,
>> > >> > > > >> > > > >> > > > > > > >>
>> > >> > > > >> > > > >> > > > > > > >> Yes, that's what I am thinking.
>> > >> OffsetEpoch
>> > >> > > will
>> > >> > > > >> be
>> > >> > > > >> > > > >> composed
>> > >> > > > >> > > > >> > of
>> > >> > > > >> > > > >> > > > > > > >> (partition_epoch,
>> > >> > > > >> > > > >> > > > > > > >> leader_epoch).
>> > >> > > > >> > > > >> > > > > > > >>
>> > >> > > > >> > > > >> > > > > > > >> Thanks,
>> > >> > > > >> > > > >> > > > > > > >>
>> > >> > > > >> > > > >> > > > > > > >> Jun
>> > >> > > > >> > > > >> > > > > > > >>
>> > >> > > > >> > > > >> > > > > > > >>
>> > >> > > > >> > > > >> > > > > > > >> On Thu, Jan 4, 2018 at 4:22 PM, Dong
>> > Lin
>> > >> <
>> > >> > > > >> > > > >> lindon...@gmail.com
>> > >> > > > >> > > > >> > >
>> > >> > > > >> > > > >> > > > > wrote:
>> > >> > > > >> > > > >> > > > > > > >>
>> > >> > > > >> > > > >> > > > > > > >> > Hey Jun,
>> > >> > > > >> > > > >> > > > > > > >> >
>> > >> > > > >> > > > >> > > > > > > >> > Thanks much. I like the the new
>> API
>> > >> that
>> > >> > you
>> > >> > > > >> > > proposed.
>> > >> > > > >> > > > I
>> > >> > > > >> > > > >> am
>> > >> > > > >> > > > >> > > not
>> > >> > > > >> > > > >> > > > > sure
>> > >> > > > >> > > > >> > > > > > > >> what
>> > >> > > > >> > > > >> > > > > > > >> > you exactly mean by offset_epoch.
>> I
>> > >> > suppose
>> > >> > > > >> that we
>> > >> > > > >> > > can
>> > >> > > > >> > > > >> use
>> > >> > > > >> > > > >> > > the
>> > >> > > > >> > > > >> > > > > pair
>> > >> > > > >> > > > >> > > > > > > of
>> > >> > > > >> > > > >> > > > > > > >> > (partition_epoch, leader_epoch) as
>> > the
>> > >> > > > >> > offset_epoch,
>> > >> > > > >> > > > >> right?
>> > >> > > > >> > > > >> > > > > > > >> >
>> > >> > > > >> > > > >> > > > > > > >> > Thanks,
>> > >> > > > >> > > > >> > > > > > > >> > Dong
>> > >> > > > >> > > > >> > > > > > > >> >
>> > >> > > > >> > > > >> > > > > > > >> > On Thu, Jan 4, 2018 at 4:02 PM,
>> Jun
>> > >> Rao <
>> > >> > > > >> > > > >> j...@confluent.io>
>> > >> > > > >> > > > >> > > > wrote:
>> > >> > > > >> > > > >> > > > > > > >> >
>> > >> > > > >> > > > >> > > > > > > >> > > Hi, Dong,
>> > >> > > > >> > > > >> > > > > > > >> > >
>> > >> > > > >> > > > >> > > > > > > >> > > Got it. The api that you
>> proposed
>> > >> works.
>> > >> > > The
>> > >> > > > >> > > question
>> > >> > > > >> > > > >> is
>> > >> > > > >> > > > >> > > > whether
>> > >> > > > >> > > > >> > > > > > > >> that's
>> > >> > > > >> > > > >> > > > > > > >> > the
>> > >> > > > >> > > > >> > > > > > > >> > > api that we want to have in the
>> > long
>> > >> > term.
>> > >> > > > My
>> > >> > > > >> > > concern
>> > >> > > > >> > > > >> is
>> > >> > > > >> > > > >> > > that
>> > >> > > > >> > > > >> > > > > > while
>> > >> > > > >> > > > >> > > > > > > >> the
>> > >> > > > >> > > > >> > > > > > > >> > api
>> > >> > > > >> > > > >> > > > > > > >> > > change is simple, the new api
>> seems
>> > >> > harder
>> > >> > > > to
>> > >> > > > >> > > explain
>> > >> > > > >> > > > >> and
>> > >> > > > >> > > > >> > > use.
>> > >> > > > >> > > > >> > > > > For
>> > >> > > > >> > > > >> > > > > > > >> > example,
>> > >> > > > >> > > > >> > > > > > > >> > > a consumer storing offsets
>> > externally
>> > >> > now
>> > >> > > > >> needs
>> > >> > > > >> > to
>> > >> > > > >> > > > call
>> > >> > > > >> > > > >> > > > > > > >> > > waitForMetadataUpdate() after
>> > calling
>> > >> > > > seek().
>> > >> > > > >> > > > >> > > > > > > >> > >
>> > >> > > > >> > > > >> > > > > > > >> > > An alternative approach is to
>> make
>> > >> the
>> > >> > > > >> following
>> > >> > > > >> > > > >> > compatible
>> > >> > > > >> > > > >> > > > api
>> > >> > > > >> > > > >> > > > > > > >> changes
>> > >> > > > >> > > > >> > > > > > > >> > in
>> > >> > > > >> > > > >> > > > > > > >> > > Consumer.
>> > >> > > > >> > > > >> > > > > > > >> > > * Add an additional OffsetEpoch
>> > >> field in
>> > >> > > > >> > > > >> > OffsetAndMetadata.
>> > >> > > > >> > > > >> > > > (no
>> > >> > > > >> > > > >> > > > > > need
>> > >> > > > >> > > > >> > > > > > > >> to
>> > >> > > > >> > > > >> > > > > > > >> > > change the CommitSync() api)
>> > >> > > > >> > > > >> > > > > > > >> > > * Add a new api
>> seek(TopicPartition
>> > >> > > > partition,
>> > >> > > > >> > long
>> > >> > > > >> > > > >> > offset,
>> > >> > > > >> > > > >> > > > > > > >> OffsetEpoch
>> > >> > > > >> > > > >> > > > > > > >> > > offsetEpoch). We can potentially
>> > >> > deprecate
>> > >> > > > the
>> > >> > > > >> > old
>> > >> > > > >> > > > api
>> > >> > > > >> > > > >> > > > > > > >> > seek(TopicPartition
>> > >> > > > >> > > > >> > > > > > > >> > > partition, long offset) in the
>> > >> future.
>> > >> > > > >> > > > >> > > > > > > >> > >
>> > >> > > > >> > > > >> > > > > > > >> > > The alternative approach has
>> > similar
>> > >> > > amount
>> > >> > > > of
>> > >> > > > >> > api
>> > >> > > > >> > > > >> changes
>> > >> > > > >> > > > >> > > as
>> > >> > > > >> > > > >> > > > > > yours
>> > >> > > > >> > > > >> > > > > > > >> but
>> > >> > > > >> > > > >> > > > > > > >> > has
>> > >> > > > >> > > > >> > > > > > > >> > > the following benefits.
>> > >> > > > >> > > > >> > > > > > > >> > > 1. The api works in a similar
>> way
>> > as
>> > >> how
>> > >> > > > >> offset
>> > >> > > > >> > > > >> management
>> > >> > > > >> > > > >> > > > works
>> > >> > > > >> > > > >> > > > > > now
>> > >> > > > >> > > > >> > > > > > > >> and
>> > >> > > > >> > > > >> > > > > > > >> > is
>> > >> > > > >> > > > >> > > > > > > >> > > probably what we want in the
>> long
>> > >> term.
>> > >> > > > >> > > > >> > > > > > > >> > > 2. It can reset offsets better
>> when
>> > >> > there
>> > >> > > is
>> > >> > > > >> data
>> > >> > > > >> > > > loss
>> > >> > > > >> > > > >> due
>> > >> > > > >> > > > >> > > to
>> > >> > > > >> > > > >> > > > > > > unclean
>> > >> > > > >> > > > >> > > > > > > >> > > leader election or correlated
>> > replica
>> > >> > > > failure.
>> > >> > > > >> > > > >> > > > > > > >> > > 3. It can reset offsets better
>> when
>> > >> > topic
>> > >> > > is
>> > >> > > > >> > > > recreated.
>> > >> > > > >> > > > >> > > > > > > >> > >
>> > >> > > > >> > > > >> > > > > > > >> > > Thanks,
>> > >> > > > >> > > > >> > > > > > > >> > >
>> > >> > > > >> > > > >> > > > > > > >> > > Jun
>> > >> > > > >> > > > >> > > > > > > >> > >
>> > >> > > > >> > > > >> > > > > > > >> > >
>> > >> > > > >> > > > >> > > > > > > >> > > On Thu, Jan 4, 2018 at 2:05 PM,
>> > Dong
>> > >> > Lin <
>> > >> > > > >> > > > >> > > lindon...@gmail.com
>> > >> > > > >> > > > >> > > > >
>> > >> > > > >> > > > >> > > > > > > wrote:
>> > >> > > > >> > > > >> > > > > > > >> > >
>> > >> > > > >> > > > >> > > > > > > >> > > > Hey Jun,
>> > >> > > > >> > > > >> > > > > > > >> > > >
>> > >> > > > >> > > > >> > > > > > > >> > > > Yeah I agree that ideally we
>> > don't
>> > >> > want
>> > >> > > an
>> > >> > > > >> ever
>> > >> > > > >> > > > >> growing
>> > >> > > > >> > > > >> > > > global
>> > >> > > > >> > > > >> > > > > > > >> metadata
>> > >> > > > >> > > > >> > > > > > > >> > > > version. I just think it may
>> be
>> > >> more
>> > >> > > > >> desirable
>> > >> > > > >> > to
>> > >> > > > >> > > > >> keep
>> > >> > > > >> > > > >> > the
>> > >> > > > >> > > > >> > > > > > > consumer
>> > >> > > > >> > > > >> > > > > > > >> API
>> > >> > > > >> > > > >> > > > > > > >> > > > simple.
>> > >> > > > >> > > > >> > > > > > > >> > > >
>> > >> > > > >> > > > >> > > > > > > >> > > > In my current proposal,
>> metadata
>> > >> > version
>> > >> > > > >> > returned
>> > >> > > > >> > > > in
>> > >> > > > >> > > > >> the
>> > >> > > > >> > > > >> > > > fetch
>> > >> > > > >> > > > >> > > > > > > >> response
>> > >> > > > >> > > > >> > > > > > > >> > > > will be stored with the offset
>> > >> > together.
>> > >> > > > >> More
>> > >> > > > >> > > > >> > > specifically,
>> > >> > > > >> > > > >> > > > > the
>> > >> > > > >> > > > >> > > > > > > >> > > > metadata_epoch in the new
>> offset
>> > >> topic
>> > >> > > > >> schema
>> > >> > > > >> > > will
>> > >> > > > >> > > > be
>> > >> > > > >> > > > >> > the
>> > >> > > > >> > > > >> > > > > > largest
>> > >> > > > >> > > > >> > > > > > > >> > > > metadata_epoch from all the
>> > >> > > > MetadataResponse
>> > >> > > > >> > and
>> > >> > > > >> > > > >> > > > FetchResponse
>> > >> > > > >> > > > >> > > > > > > ever
>> > >> > > > >> > > > >> > > > > > > >> > > > received by this consumer.
>> > >> > > > >> > > > >> > > > > > > >> > > >
>> > >> > > > >> > > > >> > > > > > > >> > > > We probably don't have to
>> change
>> > >> the
>> > >> > > > >> consumer
>> > >> > > > >> > API
>> > >> > > > >> > > > for
>> > >> > > > >> > > > >> > > > > > > >> > > > commitSync(Map<TopicPartition,
>> > >> > > > >> > > OffsetAndMetadata>).
>> > >> > > > >> > > > >> If
>> > >> > > > >> > > > >> > > user
>> > >> > > > >> > > > >> > > > > > calls
>> > >> > > > >> > > > >> > > > > > > >> > > > commitSync(...) to commit
>> offset
>> > 10
>> > >> > for
>> > >> > > a
>> > >> > > > >> given
>> > >> > > > >> > > > >> > partition,
>> > >> > > > >> > > > >> > > > for
>> > >> > > > >> > > > >> > > > > > > most
>> > >> > > > >> > > > >> > > > > > > >> > > > use-cases, this consumer
>> instance
>> > >> > should
>> > >> > > > >> have
>> > >> > > > >> > > > >> consumed
>> > >> > > > >> > > > >> > > > message
>> > >> > > > >> > > > >> > > > > > > with
>> > >> > > > >> > > > >> > > > > > > >> > > offset
>> > >> > > > >> > > > >> > > > > > > >> > > > 9 from this partition, in
>> which
>> > >> case
>> > >> > the
>> > >> > > > >> > consumer
>> > >> > > > >> > > > can
>> > >> > > > >> > > > >> > > > remember
>> > >> > > > >> > > > >> > > > > > and
>> > >> > > > >> > > > >> > > > > > > >> use
>> > >> > > > >> > > > >> > > > > > > >> > > the
>> > >> > > > >> > > > >> > > > > > > >> > > > metadata_epoch from the
>> > >> corresponding
>> > >> > > > >> > > FetchResponse
>> > >> > > > >> > > > >> when
>> > >> > > > >> > > > >> > > > > > > committing
>> > >> > > > >> > > > >> > > > > > > >> > > offset.
>> > >> > > > >> > > > >> > > > > > > >> > > > If user calls commitSync(..)
>> to
>> > >> commit
>> > >> > > > >> offset
>> > >> > > > >> > 10
>> > >> > > > >> > > > for
>> > >> > > > >> > > > >> a
>> > >> > > > >> > > > >> > > given
>> > >> > > > >> > > > >> > > > > > > >> partition
>> > >> > > > >> > > > >> > > > > > > >> > > > without having consumed the
>> > message
>> > >> > with
>> > >> > > > >> > offset 9
>> > >> > > > >> > > > >> using
>> > >> > > > >> > > > >> > > this
>> > >> > > > >> > > > >> > > > > > > >> consumer
>> > >> > > > >> > > > >> > > > > > > >> > > > instance, this is probably an
>> > >> advanced
>> > >> > > > >> > use-case.
>> > >> > > > >> > > In
>> > >> > > > >> > > > >> this
>> > >> > > > >> > > > >> > > > case
>> > >> > > > >> > > > >> > > > > > the
>> > >> > > > >> > > > >> > > > > > > >> > > advanced
>> > >> > > > >> > > > >> > > > > > > >> > > > user can retrieve the
>> > >> metadata_epoch
>> > >> > > using
>> > >> > > > >> the
>> > >> > > > >> > > > newly
>> > >> > > > >> > > > >> > added
>> > >> > > > >> > > > >> > > > > > > >> > > metadataEpoch()
>> > >> > > > >> > > > >> > > > > > > >> > > > API after it fetches the
>> message
>> > >> with
>> > >> > > > >> offset 9
>> > >> > > > >> > > > >> (probably
>> > >> > > > >> > > > >> > > > from
>> > >> > > > >> > > > >> > > > > > > >> another
>> > >> > > > >> > > > >> > > > > > > >> > > > consumer instance) and encode
>> > this
>> > >> > > > >> > metadata_epoch
>> > >> > > > >> > > > in
>> > >> > > > >> > > > >> the
>> > >> > > > >> > > > >> > > > > > > >> > > > string
>> > OffsetAndMetadata.metadata.
>> > >> Do
>> > >> > > you
>> > >> > > > >> think
>> > >> > > > >> > > > this
>> > >> > > > >> > > > >> > > > solution
>> > >> > > > >> > > > >> > > > > > > would
>> > >> > > > >> > > > >> > > > > > > >> > work?
>> > >> > > > >> > > > >> > > > > > > >> > > >
>> > >> > > > >> > > > >> > > > > > > >> > > > By "not sure that I fully
>> > >> understand
>> > >> > > your
>> > >> > > > >> > latest
>> > >> > > > >> > > > >> > > > suggestion",
>> > >> > > > >> > > > >> > > > > > are
>> > >> > > > >> > > > >> > > > > > > >> you
>> > >> > > > >> > > > >> > > > > > > >> > > > referring to solution related
>> to
>> > >> > unclean
>> > >> > > > >> leader
>> > >> > > > >> > > > >> election
>> > >> > > > >> > > > >> > > > using
>> > >> > > > >> > > > >> > > > > > > >> > > leader_epoch
>> > >> > > > >> > > > >> > > > > > > >> > > > in my previous email?
>> > >> > > > >> > > > >> > > > > > > >> > > >
>> > >> > > > >> > > > >> > > > > > > >> > > > Thanks,
>> > >> > > > >> > > > >> > > > > > > >> > > > Dong
>> > >> > > > >> > > > >> > > > > > > >> > > >
>> > >> > > > >> > > > >> > > > > > > >> > > > On Thu, Jan 4, 2018 at 1:33
>> PM,
>> > Jun
>> > >> > Rao
>> > >> > > <
>> > >> > > > >> > > > >> > j...@confluent.io
>> > >> > > > >> > > > >> > > >
>> > >> > > > >> > > > >> > > > > > wrote:
>> > >> > > > >> > > > >> > > > > > > >> > > >
>> > >> > > > >> > > > >> > > > > > > >> > > > > Hi, Dong,
>> > >> > > > >> > > > >> > > > > > > >> > > > >
>> > >> > > > >> > > > >> > > > > > > >> > > > > Not sure that I fully
>> > understand
>> > >> > your
>> > >> > > > >> latest
>> > >> > > > >> > > > >> > suggestion.
>> > >> > > > >> > > > >> > > > > > > >> Returning an
>> > >> > > > >> > > > >> > > > > > > >> > > > ever
>> > >> > > > >> > > > >> > > > > > > >> > > > > growing global metadata
>> version
>> > >> > itself
>> > >> > > > is
>> > >> > > > >> no
>> > >> > > > >> > > > ideal,
>> > >> > > > >> > > > >> > but
>> > >> > > > >> > > > >> > > is
>> > >> > > > >> > > > >> > > > > > fine.
>> > >> > > > >> > > > >> > > > > > > >> My
>> > >> > > > >> > > > >> > > > > > > >> > > > > question is whether the
>> > metadata
>> > >> > > version
>> > >> > > > >> > > returned
>> > >> > > > >> > > > >> in
>> > >> > > > >> > > > >> > the
>> > >> > > > >> > > > >> > > > > fetch
>> > >> > > > >> > > > >> > > > > > > >> > response
>> > >> > > > >> > > > >> > > > > > > >> > > > > needs to be stored with the
>> > >> offset
>> > >> > > > >> together
>> > >> > > > >> > if
>> > >> > > > >> > > > >> offsets
>> > >> > > > >> > > > >> > > are
>> > >> > > > >> > > > >> > > > > > > stored
>> > >> > > > >> > > > >> > > > > > > >> > > > > externally. If so, we also
>> have
>> > >> to
>> > >> > > > change
>> > >> > > > >> the
>> > >> > > > >> > > > >> consumer
>> > >> > > > >> > > > >> > > API
>> > >> > > > >> > > > >> > > > > for
>> > >> > > > >> > > > >> > > > > > > >> > > > commitSync()
>> > >> > > > >> > > > >> > > > > > > >> > > > > and need to worry about
>> > >> > compatibility.
>> > >> > > > If
>> > >> > > > >> we
>> > >> > > > >> > > > don't
>> > >> > > > >> > > > >> > store
>> > >> > > > >> > > > >> > > > the
>> > >> > > > >> > > > >> > > > > > > >> metadata
>> > >> > > > >> > > > >> > > > > > > >> > > > > version together with the
>> > offset,
>> > >> > on a
>> > >> > > > >> > consumer
>> > >> > > > >> > > > >> > restart,
>> > >> > > > >> > > > >> > > > > it's
>> > >> > > > >> > > > >> > > > > > > not
>> > >> > > > >> > > > >> > > > > > > >> > clear
>> > >> > > > >> > > > >> > > > > > > >> > > > how
>> > >> > > > >> > > > >> > > > > > > >> > > > > we can ensure the metadata
>> in
>> > the
>> > >> > > > >> consumer is
>> > >> > > > >> > > > high
>> > >> > > > >> > > > >> > > enough
>> > >> > > > >> > > > >> > > > > > since
>> > >> > > > >> > > > >> > > > > > > >> there
>> > >> > > > >> > > > >> > > > > > > >> > > is
>> > >> > > > >> > > > >> > > > > > > >> > > > no
>> > >> > > > >> > > > >> > > > > > > >> > > > > metadata version to compare
>> > with.
>> > >> > > > >> > > > >> > > > > > > >> > > > >
>> > >> > > > >> > > > >> > > > > > > >> > > > > Thanks,
>> > >> > > > >> > > > >> > > > > > > >> > > > >
>> > >> > > > >> > > > >> > > > > > > >> > > > > Jun
>> > >> > > > >> > > > >> > > > > > > >> > > > >
>> > >> > > > >> > > > >> > > > > > > >> > > > >
>> > >> > > > >> > > > >> > > > > > > >> > > > > On Wed, Jan 3, 2018 at 6:43
>> PM,
>> > >> Dong
>> > >> > > > Lin <
>> > >> > > > >> > > > >> > > > > lindon...@gmail.com
>> > >> > > > >> > > > >> > > > > > >
>> > >> > > > >> > > > >> > > > > > > >> > wrote:
>> > >> > > > >> > > > >> > > > > > > >> > > > >
>> > >> > > > >> > > > >> > > > > > > >> > > > > > Hey Jun,
>> > >> > > > >> > > > >> > > > > > > >> > > > > >
>> > >> > > > >> > > > >> > > > > > > >> > > > > > Thanks much for the
>> > >> explanation.
>> > >> > > > >> > > > >> > > > > > > >> > > > > >
>> > >> > > > >> > > > >> > > > > > > >> > > > > > I understand the
>> advantage of
>> > >> > > > >> > partition_epoch
>> > >> > > > >> > > > >> over
>> > >> > > > >> > > > >> > > > > > > >> metadata_epoch.
>> > >> > > > >> > > > >> > > > > > > >> > My
>> > >> > > > >> > > > >> > > > > > > >> > > > > > current concern is that
>> the
>> > >> use of
>> > >> > > > >> > > leader_epoch
>> > >> > > > >> > > > >> and
>> > >> > > > >> > > > >> > > the
>> > >> > > > >> > > > >> > > > > > > >> > > partition_epoch
>> > >> > > > >> > > > >> > > > > > > >> > > > > > requires us considerable
>> > >> change to
>> > >> > > > >> > consumer's
>> > >> > > > >> > > > >> public
>> > >> > > > >> > > > >> > > API
>> > >> > > > >> > > > >> > > > > to
>> > >> > > > >> > > > >> > > > > > > take
>> > >> > > > >> > > > >> > > > > > > >> > care
>> > >> > > > >> > > > >> > > > > > > >> > > > of
>> > >> > > > >> > > > >> > > > > > > >> > > > > > the case where user stores
>> > >> offset
>> > >> > > > >> > externally.
>> > >> > > > >> > > > For
>> > >> > > > >> > > > >> > > > example,
>> > >> > > > >> > > > >> > > > > > > >> > > *consumer*.
>> > >> > > > >> > > > >> > > > > > > >> > > > > > *commitSync*(..) would
>> have
>> > to
>> > >> > take
>> > >> > > a
>> > >> > > > >> map
>> > >> > > > >> > > whose
>> > >> > > > >> > > > >> > value
>> > >> > > > >> > > > >> > > is
>> > >> > > > >> > > > >> > > > > > > >> <offset,
>> > >> > > > >> > > > >> > > > > > > >> > > > > metadata,
>> > >> > > > >> > > > >> > > > > > > >> > > > > > leader epoch, partition
>> > epoch>.
>> > >> > > > >> > > > >> > *consumer*.*seek*(...)
>> > >> > > > >> > > > >> > > > > would
>> > >> > > > >> > > > >> > > > > > > >> also
>> > >> > > > >> > > > >> > > > > > > >> > > need
>> > >> > > > >> > > > >> > > > > > > >> > > > > > leader_epoch and
>> > >> partition_epoch
>> > >> > as
>> > >> > > > >> > > parameter.
>> > >> > > > >> > > > >> > > > Technically
>> > >> > > > >> > > > >> > > > > > we
>> > >> > > > >> > > > >> > > > > > > >> can
>> > >> > > > >> > > > >> > > > > > > >> > > > > probably
>> > >> > > > >> > > > >> > > > > > > >> > > > > > still make it work in a
>> > >> backward
>> > >> > > > >> compatible
>> > >> > > > >> > > > >> manner
>> > >> > > > >> > > > >> > > after
>> > >> > > > >> > > > >> > > > > > > careful
>> > >> > > > >> > > > >> > > > > > > >> > > design
>> > >> > > > >> > > > >> > > > > > > >> > > > > and
>> > >> > > > >> > > > >> > > > > > > >> > > > > > discussion. But these
>> changes
>> > >> can
>> > >> > > make
>> > >> > > > >> the
>> > >> > > > >> > > > >> > consumer's
>> > >> > > > >> > > > >> > > > > > > interface
>> > >> > > > >> > > > >> > > > > > > >> > > > > > unnecessarily complex for
>> > more
>> > >> > users
>> > >> > > > >> who do
>> > >> > > > >> > > not
>> > >> > > > >> > > > >> > store
>> > >> > > > >> > > > >> > > > > offset
>> > >> > > > >> > > > >> > > > > > > >> > > > externally.
>> > >> > > > >> > > > >> > > > > > > >> > > > > >
>> > >> > > > >> > > > >> > > > > > > >> > > > > > After thinking more about
>> it,
>> > >> we
>> > >> > can
>> > >> > > > >> > address
>> > >> > > > >> > > > all
>> > >> > > > >> > > > >> > > > problems
>> > >> > > > >> > > > >> > > > > > > >> discussed
>> > >> > > > >> > > > >> > > > > > > >> > > by
>> > >> > > > >> > > > >> > > > > > > >> > > > > only
>> > >> > > > >> > > > >> > > > > > > >> > > > > > using the metadata_epoch
>> > >> without
>> > >> > > > >> > introducing
>> > >> > > > >> > > > >> > > > leader_epoch
>> > >> > > > >> > > > >> > > > > or
>> > >> > > > >> > > > >> > > > > > > the
>> > >> > > > >> > > > >> > > > > > > >> > > > > > partition_epoch. The
>> current
>> > >> KIP
>> > >> > > > >> describes
>> > >> > > > >> > > the
>> > >> > > > >> > > > >> > changes
>> > >> > > > >> > > > >> > > > to
>> > >> > > > >> > > > >> > > > > > the
>> > >> > > > >> > > > >> > > > > > > >> > > consumer
>> > >> > > > >> > > > >> > > > > > > >> > > > > API
>> > >> > > > >> > > > >> > > > > > > >> > > > > > and how the new API can be
>> > >> used if
>> > >> > > > user
>> > >> > > > >> > > stores
>> > >> > > > >> > > > >> > offset
>> > >> > > > >> > > > >> > > > > > > >> externally.
>> > >> > > > >> > > > >> > > > > > > >> > In
>> > >> > > > >> > > > >> > > > > > > >> > > > > order
>> > >> > > > >> > > > >> > > > > > > >> > > > > > to address the scenario
>> you
>> > >> > > described
>> > >> > > > >> > > earlier,
>> > >> > > > >> > > > we
>> > >> > > > >> > > > >> > can
>> > >> > > > >> > > > >> > > > > > include
>> > >> > > > >> > > > >> > > > > > > >> > > > > > metadata_epoch in the
>> > >> > FetchResponse
>> > >> > > > and
>> > >> > > > >> the
>> > >> > > > >> > > > >> > > > > > > LeaderAndIsrRequest.
>> > >> > > > >> > > > >> > > > > > > >> > > > Consumer
>> > >> > > > >> > > > >> > > > > > > >> > > > > > remembers the largest
>> > >> > metadata_epoch
>> > >> > > > >> from
>> > >> > > > >> > all
>> > >> > > > >> > > > the
>> > >> > > > >> > > > >> > > > > > > FetchResponse
>> > >> > > > >> > > > >> > > > > > > >> it
>> > >> > > > >> > > > >> > > > > > > >> > > has
>> > >> > > > >> > > > >> > > > > > > >> > > > > > received. The
>> metadata_epoch
>> > >> > > committed
>> > >> > > > >> with
>> > >> > > > >> > > the
>> > >> > > > >> > > > >> > > offset,
>> > >> > > > >> > > > >> > > > > > either
>> > >> > > > >> > > > >> > > > > > > >> > within
>> > >> > > > >> > > > >> > > > > > > >> > > > or
>> > >> > > > >> > > > >> > > > > > > >> > > > > > outside Kafka, should be
>> the
>> > >> > largest
>> > >> > > > >> > > > >> metadata_epoch
>> > >> > > > >> > > > >> > > > across
>> > >> > > > >> > > > >> > > > > > all
>> > >> > > > >> > > > >> > > > > > > >> > > > > > FetchResponse and
>> > >> MetadataResponse
>> > >> > > > ever
>> > >> > > > >> > > > received
>> > >> > > > >> > > > >> by
>> > >> > > > >> > > > >> > > this
>> > >> > > > >> > > > >> > > > > > > >> consumer.
>> > >> > > > >> > > > >> > > > > > > >> > > > > >
>> > >> > > > >> > > > >> > > > > > > >> > > > > > The drawback of using only
>> > the
>> > >> > > > >> > metadata_epoch
>> > >> > > > >> > > > is
>> > >> > > > >> > > > >> > that
>> > >> > > > >> > > > >> > > we
>> > >> > > > >> > > > >> > > > > can
>> > >> > > > >> > > > >> > > > > > > not
>> > >> > > > >> > > > >> > > > > > > >> > > always
>> > >> > > > >> > > > >> > > > > > > >> > > > > do
>> > >> > > > >> > > > >> > > > > > > >> > > > > > the smart offset reset in
>> > case
>> > >> of
>> > >> > > > >> unclean
>> > >> > > > >> > > > leader
>> > >> > > > >> > > > >> > > > election
>> > >> > > > >> > > > >> > > > > > > which
>> > >> > > > >> > > > >> > > > > > > >> you
>> > >> > > > >> > > > >> > > > > > > >> > > > > > mentioned earlier. But in
>> > most
>> > >> > case,
>> > >> > > > >> > unclean
>> > >> > > > >> > > > >> leader
>> > >> > > > >> > > > >> > > > > election
>> > >> > > > >> > > > >> > > > > > > >> > probably
>> > >> > > > >> > > > >> > > > > > > >> > > > > > happens when consumer is
>> not
>> > >> > > > >> > > > >> rebalancing/restarting.
>> > >> > > > >> > > > >> > > In
>> > >> > > > >> > > > >> > > > > > these
>> > >> > > > >> > > > >> > > > > > > >> > cases,
>> > >> > > > >> > > > >> > > > > > > >> > > > > either
>> > >> > > > >> > > > >> > > > > > > >> > > > > > consumer is not directly
>> > >> affected
>> > >> > by
>> > >> > > > >> > unclean
>> > >> > > > >> > > > >> leader
>> > >> > > > >> > > > >> > > > > election
>> > >> > > > >> > > > >> > > > > > > >> since
>> > >> > > > >> > > > >> > > > > > > >> > it
>> > >> > > > >> > > > >> > > > > > > >> > > > is
>> > >> > > > >> > > > >> > > > > > > >> > > > > > not consuming from the
>> end of
>> > >> the
>> > >> > > log,
>> > >> > > > >> or
>> > >> > > > >> > > > >> consumer
>> > >> > > > >> > > > >> > can
>> > >> > > > >> > > > >> > > > > > derive
>> > >> > > > >> > > > >> > > > > > > >> the
>> > >> > > > >> > > > >> > > > > > > >> > > > > > leader_epoch from the most
>> > >> recent
>> > >> > > > >> message
>> > >> > > > >> > > > >> received
>> > >> > > > >> > > > >> > > > before
>> > >> > > > >> > > > >> > > > > it
>> > >> > > > >> > > > >> > > > > > > >> sees
>> > >> > > > >> > > > >> > > > > > > >> > > > > >
>> OffsetOutOfRangeException. So
>> > >> I am
>> > >> > > not
>> > >> > > > >> sure
>> > >> > > > >> > > it
>> > >> > > > >> > > > is
>> > >> > > > >> > > > >> > > worth
>> > >> > > > >> > > > >> > > > > > adding
>> > >> > > > >> > > > >> > > > > > > >> the
>> > >> > > > >> > > > >> > > > > > > >> > > > > > leader_epoch to consumer
>> API
>> > to
>> > >> > > > address
>> > >> > > > >> the
>> > >> > > > >> > > > >> > remaining
>> > >> > > > >> > > > >> > > > > corner
>> > >> > > > >> > > > >> > > > > > > >> case.
>> > >> > > > >> > > > >> > > > > > > >> > > What
>> > >> > > > >> > > > >> > > > > > > >> > > > > do
>> > >> > > > >> > > > >> > > > > > > >> > > > > > you think?
>> > >> > > > >> > > > >> > > > > > > >> > > > > >
>> > >> > > > >> > > > >> > > > > > > >> > > > > > Thanks,
>> > >> > > > >> > > > >> > > > > > > >> > > > > > Dong
>> > >> > > > >> > > > >> > > > > > > >> > > > > >
>> > >> > > > >> > > > >> > > > > > > >> > > > > >
>> > >> > > > >> > > > >> > > > > > > >> > > > > >
>> > >> > > > >> > > > >> > > > > > > >> > > > > > On Tue, Jan 2, 2018 at
>> 6:28
>> > PM,
>> > >> > Jun
>> > >> > > > Rao
>> > >> > > > >> <
>> > >> > > > >> > > > >> > > > j...@confluent.io
>> > >> > > > >> > > > >> > > > > >
>> > >> > > > >> > > > >> > > > > > > >> wrote:
>> > >> > > > >> > > > >> > > > > > > >> > > > > >
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > Hi, Dong,
>> > >> > > > >> > > > >> > > > > > > >> > > > > > >
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > Thanks for the reply.
>> > >> > > > >> > > > >> > > > > > > >> > > > > > >
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > To solve the topic
>> > recreation
>> > >> > > issue,
>> > >> > > > >> we
>> > >> > > > >> > > could
>> > >> > > > >> > > > >> use
>> > >> > > > >> > > > >> > > > > either a
>> > >> > > > >> > > > >> > > > > > > >> global
>> > >> > > > >> > > > >> > > > > > > >> > > > > > metadata
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > version or a partition
>> > level
>> > >> > > epoch.
>> > >> > > > >> But
>> > >> > > > >> > > > either
>> > >> > > > >> > > > >> one
>> > >> > > > >> > > > >> > > > will
>> > >> > > > >> > > > >> > > > > > be a
>> > >> > > > >> > > > >> > > > > > > >> new
>> > >> > > > >> > > > >> > > > > > > >> > > > > concept,
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > right? To me, the latter
>> > >> seems
>> > >> > > more
>> > >> > > > >> > > natural.
>> > >> > > > >> > > > It
>> > >> > > > >> > > > >> > also
>> > >> > > > >> > > > >> > > > > makes
>> > >> > > > >> > > > >> > > > > > > it
>> > >> > > > >> > > > >> > > > > > > >> > > easier
>> > >> > > > >> > > > >> > > > > > > >> > > > to
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > detect if a consumer's
>> > >> offset is
>> > >> > > > still
>> > >> > > > >> > > valid
>> > >> > > > >> > > > >> > after a
>> > >> > > > >> > > > >> > > > > topic
>> > >> > > > >> > > > >> > > > > > > is
>> > >> > > > >> > > > >> > > > > > > >> > > > > recreated.
>> > >> > > > >> > > > >> > > > > > > >> > > > > > As
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > you pointed out, we
>> don't
>> > >> need
>> > >> > to
>> > >> > > > >> store
>> > >> > > > >> > the
>> > >> > > > >> > > > >> > > partition
>> > >> > > > >> > > > >> > > > > > epoch
>> > >> > > > >> > > > >> > > > > > > in
>> > >> > > > >> > > > >> > > > > > > >> > the
>> > >> > > > >> > > > >> > > > > > > >> > > > > > message.
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > The following is what I
>> am
>> > >> > > thinking.
>> > >> > > > >> > When a
>> > >> > > > >> > > > >> > > partition
>> > >> > > > >> > > > >> > > > is
>> > >> > > > >> > > > >> > > > > > > >> created,
>> > >> > > > >> > > > >> > > > > > > >> > > we
>> > >> > > > >> > > > >> > > > > > > >> > > > > can
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > assign a partition epoch
>> > >> from an
>> > >> > > > >> > > > >> ever-increasing
>> > >> > > > >> > > > >> > > > global
>> > >> > > > >> > > > >> > > > > > > >> counter
>> > >> > > > >> > > > >> > > > > > > >> > and
>> > >> > > > >> > > > >> > > > > > > >> > > > > store
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > it in
>> > >> /brokers/topics/[topic]/
>> > >> > > > >> > > > >> > > > partitions/[partitionId]
>> > >> > > > >> > > > >> > > > > in
>> > >> > > > >> > > > >> > > > > > > ZK.
>> > >> > > > >> > > > >> > > > > > > >> > The
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > partition
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > epoch is propagated to
>> > every
>> > >> > > broker.
>> > >> > > > >> The
>> > >> > > > >> > > > >> consumer
>> > >> > > > >> > > > >> > > will
>> > >> > > > >> > > > >> > > > > be
>> > >> > > > >> > > > >> > > > > > > >> > tracking
>> > >> > > > >> > > > >> > > > > > > >> > > a
>> > >> > > > >> > > > >> > > > > > > >> > > > > > tuple
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > of <offset, leader
>> epoch,
>> > >> > > partition
>> > >> > > > >> > epoch>
>> > >> > > > >> > > > for
>> > >> > > > >> > > > >> > > > offsets.
>> > >> > > > >> > > > >> > > > > > If a
>> > >> > > > >> > > > >> > > > > > > >> > topic
>> > >> > > > >> > > > >> > > > > > > >> > > is
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > recreated, it's possible
>> > >> that a
>> > >> > > > >> > consumer's
>> > >> > > > >> > > > >> offset
>> > >> > > > >> > > > >> > > and
>> > >> > > > >> > > > >> > > > > > leader
>> > >> > > > >> > > > >> > > > > > > >> > epoch
>> > >> > > > >> > > > >> > > > > > > >> > > > > still
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > match that in the
>> broker,
>> > but
>> > >> > > > >> partition
>> > >> > > > >> > > epoch
>> > >> > > > >> > > > >> > won't
>> > >> > > > >> > > > >> > > > be.
>> > >> > > > >> > > > >> > > > > In
>> > >> > > > >> > > > >> > > > > > > >> this
>> > >> > > > >> > > > >> > > > > > > >> > > case,
>> > >> > > > >> > > > >> > > > > > > >> > > > > we
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > can potentially still
>> treat
>> > >> the
>> > >> > > > >> > consumer's
>> > >> > > > >> > > > >> offset
>> > >> > > > >> > > > >> > as
>> > >> > > > >> > > > >> > > > out
>> > >> > > > >> > > > >> > > > > > of
>> > >> > > > >> > > > >> > > > > > > >> range
>> > >> > > > >> > > > >> > > > > > > >> > > and
>> > >> > > > >> > > > >> > > > > > > >> > > > > > reset
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > the offset based on the
>> > >> offset
>> > >> > > reset
>> > >> > > > >> > policy
>> > >> > > > >> > > > in
>> > >> > > > >> > > > >> the
>> > >> > > > >> > > > >> > > > > > consumer.
>> > >> > > > >> > > > >> > > > > > > >> This
>> > >> > > > >> > > > >> > > > > > > >> > > > seems
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > harder to do with a
>> global
>> > >> > > metadata
>> > >> > > > >> > > version.
>> > >> > > > >> > > > >> > > > > > > >> > > > > > >
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > Jun
>> > >> > > > >> > > > >> > > > > > > >> > > > > > >
>> > >> > > > >> > > > >> > > > > > > >> > > > > > >
>> > >> > > > >> > > > >> > > > > > > >> > > > > > >
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > On Mon, Dec 25, 2017 at
>> > 6:56
>> > >> AM,
>> > >> > > > Dong
>> > >> > > > >> > Lin <
>> > >> > > > >> > > > >> > > > > > > >> lindon...@gmail.com>
>> > >> > > > >> > > > >> > > > > > > >> > > > wrote:
>> > >> > > > >> > > > >> > > > > > > >> > > > > > >
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > > Hey Jun,
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > >
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > > This is a very good
>> > >> example.
>> > >> > > After
>> > >> > > > >> > > thinking
>> > >> > > > >> > > > >> > > through
>> > >> > > > >> > > > >> > > > > this
>> > >> > > > >> > > > >> > > > > > > in
>> > >> > > > >> > > > >> > > > > > > >> > > > detail, I
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > agree
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > > that we need to commit
>> > >> offset
>> > >> > > with
>> > >> > > > >> > leader
>> > >> > > > >> > > > >> epoch
>> > >> > > > >> > > > >> > in
>> > >> > > > >> > > > >> > > > > order
>> > >> > > > >> > > > >> > > > > > > to
>> > >> > > > >> > > > >> > > > > > > >> > > address
>> > >> > > > >> > > > >> > > > > > > >> > > > > > this
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > > example.
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > >
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > > I think the remaining
>> > >> question
>> > >> > > is
>> > >> > > > >> how
>> > >> > > > >> > to
>> > >> > > > >> > > > >> address
>> > >> > > > >> > > > >> > > the
>> > >> > > > >> > > > >> > > > > > > >> scenario
>> > >> > > > >> > > > >> > > > > > > >> > > that
>> > >> > > > >> > > > >> > > > > > > >> > > > > the
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > > topic is deleted and
>> > >> > re-created.
>> > >> > > > One
>> > >> > > > >> > > > possible
>> > >> > > > >> > > > >> > > > solution
>> > >> > > > >> > > > >> > > > > > is
>> > >> > > > >> > > > >> > > > > > > to
>> > >> > > > >> > > > >> > > > > > > >> > > commit
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > offset
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > > with both the leader
>> > epoch
>> > >> and
>> > >> > > the
>> > >> > > > >> > > metadata
>> > >> > > > >> > > > >> > > version.
>> > >> > > > >> > > > >> > > > > The
>> > >> > > > >> > > > >> > > > > > > >> logic
>> > >> > > > >> > > > >> > > > > > > >> > > and
>> > >> > > > >> > > > >> > > > > > > >> > > > > the
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > > implementation of this
>> > >> > solution
>> > >> > > > does
>> > >> > > > >> > not
>> > >> > > > >> > > > >> > require a
>> > >> > > > >> > > > >> > > > new
>> > >> > > > >> > > > >> > > > > > > >> concept
>> > >> > > > >> > > > >> > > > > > > >> > > > (e.g.
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > > partition epoch) and
>> it
>> > >> does
>> > >> > not
>> > >> > > > >> > require
>> > >> > > > >> > > > any
>> > >> > > > >> > > > >> > > change
>> > >> > > > >> > > > >> > > > to
>> > >> > > > >> > > > >> > > > > > the
>> > >> > > > >> > > > >> > > > > > > >> > > message
>> > >> > > > >> > > > >> > > > > > > >> > > > > > format
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > > or leader epoch. It
>> also
>> > >> > allows
>> > >> > > us
>> > >> > > > >> to
>> > >> > > > >> > > order
>> > >> > > > >> > > > >> the
>> > >> > > > >> > > > >> > > > > metadata
>> > >> > > > >> > > > >> > > > > > > in
>> > >> > > > >> > > > >> > > > > > > >> a
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > > straightforward manner
>> > >> which
>> > >> > may
>> > >> > > > be
>> > >> > > > >> > > useful
>> > >> > > > >> > > > in
>> > >> > > > >> > > > >> > the
>> > >> > > > >> > > > >> > > > > > future.
>> > >> > > > >> > > > >> > > > > > > >> So it
>> > >> > > > >> > > > >> > > > > > > >> > > may
>> > >> > > > >> > > > >> > > > > > > >> > > > > be
>> > >> > > > >> > > > >> > > > > > > >> > > > > > a
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > > better solution than
>> > >> > generating
>> > >> > > a
>> > >> > > > >> > random
>> > >> > > > >> > > > >> > partition
>> > >> > > > >> > > > >> > > > > epoch
>> > >> > > > >> > > > >> > > > > > > >> every
>> > >> > > > >> > > > >> > > > > > > >> > > time
>> > >> > > > >> > > > >> > > > > > > >> > > > > we
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > > create a partition.
>> Does
>> > >> this
>> > >> > > > sound
>> > >> > > > >> > > > >> reasonable?
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > >
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > > Previously one concern
>> > with
>> > >> > > using
>> > >> > > > >> the
>> > >> > > > >> > > > >> metadata
>> > >> > > > >> > > > >> > > > version
>> > >> > > > >> > > > >> > > > > > is
>> > >> > > > >> > > > >> > > > > > > >> that
>> > >> > > > >> > > > >> > > > > > > >> > > > > consumer
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > > will be forced to
>> refresh
>> > >> > > metadata
>> > >> > > > >> even
>> > >> > > > >> > > if
>> > >> > > > >> > > > >> > > metadata
>> > >> > > > >> > > > >> > > > > > > version
>> > >> > > > >> > > > >> > > > > > > >> is
>> > >> > > > >> > > > >> > > > > > > >> > > > > > increased
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > > due to topics that the
>> > >> > consumer
>> > >> > > is
>> > >> > > > >> not
>> > >> > > > >> > > > >> > interested
>> > >> > > > >> > > > >> > > > in.
>> > >> > > > >> > > > >> > > > > > Now
>> > >> > > > >> > > > >> > > > > > > I
>> > >> > > > >> > > > >> > > > > > > >> > > > realized
>> > >> > > > >> > > > >> > > > > > > >> > > > > > that
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > > this is probably not a
>> > >> > problem.
>> > >> > > > >> > Currently
>> > >> > > > >> > > > >> client
>> > >> > > > >> > > > >> > > > will
>> > >> > > > >> > > > >> > > > > > > >> refresh
>> > >> > > > >> > > > >> > > > > > > >> > > > > metadata
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > > either due to
>> > >> > > > >> InvalidMetadataException
>> > >> > > > >> > in
>> > >> > > > >> > > > the
>> > >> > > > >> > > > >> > > > response
>> > >> > > > >> > > > >> > > > > > > from
>> > >> > > > >> > > > >> > > > > > > >> > > broker
>> > >> > > > >> > > > >> > > > > > > >> > > > or
>> > >> > > > >> > > > >> > > > > > > >> > > > > > due
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > > to metadata expiry.
>> The
>> > >> > addition
>> > >> > > > of
>> > >> > > > >> the
>> > >> > > > >> > > > >> metadata
>> > >> > > > >> > > > >> > > > > version
>> > >> > > > >> > > > >> > > > > > > >> should
>> > >> > > > >> > > > >> > > > > > > >> > > > > > increase
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > > the overhead of
>> metadata
>> > >> > refresh
>> > >> > > > >> caused
>> > >> > > > >> > > by
>> > >> > > > >> > > > >> > > > > > > >> > > > InvalidMetadataException.
>> > >> > > > >> > > > >> > > > > > > >> > > > > If
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > > client refresh
>> metadata
>> > >> due to
>> > >> > > > >> expiry
>> > >> > > > >> > and
>> > >> > > > >> > > > it
>> > >> > > > >> > > > >> > > > receives
>> > >> > > > >> > > > >> > > > > a
>> > >> > > > >> > > > >> > > > > > > >> > metadata
>> > >> > > > >> > > > >> > > > > > > >> > > > > whose
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > > version is lower than
>> the
>> > >> > > current
>> > >> > > > >> > > metadata
>> > >> > > > >> > > > >> > > version,
>> > >> > > > >> > > > >> > > > we
>> > >> > > > >> > > > >> > > > > > can
>> > >> > > > >> > > > >> > > > > > > >> > reject
>> > >> > > > >> > > > >> > > > > > > >> > > > the
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > > metadata but still
>> reset
>> > >> the
>> > >> > > > >> metadata
>> > >> > > > >> > > age,
>> > >> > > > >> > > > >> which
>> > >> > > > >> > > > >> > > > > > > essentially
>> > >> > > > >> > > > >> > > > > > > >> > keep
>> > >> > > > >> > > > >> > > > > > > >> > > > the
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > > existing behavior in
>> the
>> > >> > client.
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > >
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > > Thanks much,
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > > Dong
>> > >> > > > >> > > > >> > > > > > > >> > > > > > > >
>> > >> > > > >> > > > >> > > > > > > >> > > > > > >
>> > >> > > > >> > > > >> > > > > > > >> > > > > >
>> > >> > > > >> > > > >> > > > > > > >> > > > >
>> > >> > > > >> > > > >> > > > > > > >> > > >
>> > >> > > > >> > > > >> > > > > > > >> > >
>> > >> > > > >> > > > >> > > > > > > >> >
>> > >> > > > >> > > > >> > > > > > > >>
>> > >> > > > >> > > > >> > > > > > > >
>> > >> > > > >> > > > >> > > > > > > >
>> > >> > > > >> > > > >> > > > > > >
>> > >> > > > >> > > > >> > > > > >
>> > >> > > > >> > > > >> > > > >
>> > >> > > > >> > > > >> > > >
>> > >> > > > >> > > > >> > >
>> > >> > > > >> > > > >> >
>> > >> > > > >> > > > >>
>> > >> > > > >> > > > >
>> > >> > > > >> > > > >
>> > >> > > > >> > > >
>> > >> > > > >> > >
>> > >> > > > >> >
>> > >> > > > >>
>> > >> > > > >
>> > >> > > > >
>> > >> > > >
>> > >> > >
>> > >> >
>> > >>
>> > >
>> > >
>> >
>>
>
>

Reply via email to