Hey Jun, Do you think the current KIP looks OK? I am wondering if we can open the voting thread.
Thanks! Dong On Fri, Jan 19, 2018 at 3:08 PM, Dong Lin <lindon...@gmail.com> wrote: > Hey Jun, > > I think we can probably have a static method in Util class to decode the > byte[]. Both KafkaConsumer implementation and the user application will be > able to decode the byte array and log its content for debug purpose. So it > seems that we can still print the information we want. It is just not > explicitly exposed in the consumer interface. Would this address the > problem here? > > Yeah we can include OffsetEpoch in AdminClient. This can be added in > KIP-222? Is there something you would like me to add in this KIP? > > Thanks! > Dong > > On Fri, Jan 19, 2018 at 3:00 PM, Jun Rao <j...@confluent.io> wrote: > >> Hi, Dong, >> >> The issue with using just byte[] for OffsetEpoch is that it won't be >> printable, which makes debugging harder. >> >> Also, KIP-222 proposes a listGroupOffset() method in AdminClient. If that >> gets adopted before this KIP, we probably want to include OffsetEpoch in >> the AdminClient too. >> >> Thanks, >> >> Jun >> >> >> On Thu, Jan 18, 2018 at 6:30 PM, Dong Lin <lindon...@gmail.com> wrote: >> >> > Hey Jun, >> > >> > I agree. I have updated the KIP to remove the class OffetEpoch and >> replace >> > OffsetEpoch with byte[] in APIs that use it. Can you see if it looks >> good? >> > >> > Thanks! >> > Dong >> > >> > On Thu, Jan 18, 2018 at 6:07 PM, Jun Rao <j...@confluent.io> wrote: >> > >> > > Hi, Dong, >> > > >> > > Thanks for the updated KIP. It looks good to me now. The only thing is >> > > for OffsetEpoch. >> > > If we expose the individual fields in the class, we probably don't >> need >> > the >> > > encode/decode methods. If we want to hide the details of OffsetEpoch, >> we >> > > probably don't want expose the individual fields. >> > > >> > > Jun >> > > >> > > On Wed, Jan 17, 2018 at 10:10 AM, Dong Lin <lindon...@gmail.com> >> wrote: >> > > >> > > > Thinking about point 61 more, I realize that the async zookeeper >> read >> > may >> > > > make it less of an issue for controller to read more zookeeper >> nodes. >> > > > Writing partition_epoch in the per-partition znode makes it simpler >> to >> > > > handle the broker failure between zookeeper writes for a topic >> > creation. >> > > I >> > > > have updated the KIP to use the suggested approach. >> > > > >> > > > >> > > > On Wed, Jan 17, 2018 at 9:57 AM, Dong Lin <lindon...@gmail.com> >> wrote: >> > > > >> > > > > Hey Jun, >> > > > > >> > > > > Thanks much for the comments. Please see my comments inline. >> > > > > >> > > > > On Tue, Jan 16, 2018 at 4:38 PM, Jun Rao <j...@confluent.io> >> wrote: >> > > > > >> > > > >> Hi, Dong, >> > > > >> >> > > > >> Thanks for the updated KIP. Looks good to me overall. Just a few >> > minor >> > > > >> comments. >> > > > >> >> > > > >> 60. OffsetAndMetadata positionAndOffsetEpoch(TopicPartition >> > > partition): >> > > > >> It >> > > > >> seems that there is no need to return metadata. We probably want >> to >> > > > return >> > > > >> sth like OffsetAndEpoch. >> > > > >> >> > > > > >> > > > > Previously I think we may want to re-use the existing class to >> keep >> > our >> > > > > consumer interface simpler. I have updated the KIP to add class >> > > > > OffsetAndOffsetEpoch. I didn't use OffsetAndEpoch because user may >> > > > confuse >> > > > > this name with OffsetEpoch. Does this sound OK? >> > > > > >> > > > > >> > > > >> >> > > > >> 61. Should we store partition_epoch in >> > > > >> /brokers/topics/[topic]/partitions/[partitionId] in ZK? >> > > > >> >> > > > > >> > > > > I have considered this. I think the advantage of adding the >> > > > > partition->partition_epoch map in the existing >> > > > > znode /brokers/topics/[topic]/partitions is that controller only >> > needs >> > > > to >> > > > > read one znode per topic to gets its partition_epoch information. >> > > > Otherwise >> > > > > controller may need to read one extra znode per partition to get >> the >> > > same >> > > > > information. >> > > > > >> > > > > When we delete partition or expand partition of a topic, someone >> > needs >> > > to >> > > > > modify partition->partition_epoch map in znode >> > > > > /brokers/topics/[topic]/partitions. This may seem a bit more >> > > complicated >> > > > > than simply adding or deleting znode /brokers/topics/[topic]/ >> > > > partitions/[partitionId]. >> > > > > But the complexity is probably similar to the existing operation >> of >> > > > > modifying the partition->replica_list mapping in znode >> > > > > /brokers/topics/[topic]. So not sure it is better to store the >> > > > > partition_epoch in /brokers/topics/[topic]/partit >> ions/[partitionId]. >> > > > What >> > > > > do you think? >> > > > > >> > > > > >> > > > >> >> > > > >> 62. For checking outdated metadata in the client, we probably >> want >> > to >> > > > add >> > > > >> when max_partition_epoch will be used. >> > > > >> >> > > > > >> > > > > The max_partition_epoch is used in the Proposed Changes -> >> Client's >> > > > > metadata refresh section to determine whether a metadata is >> outdated. >> > > And >> > > > > this formula is referenced and re-used in other sections to >> determine >> > > > > whether a metadata is outdated. Does this formula look OK? >> > > > > >> > > > > >> > > > >> >> > > > >> 63. "The leader_epoch should be the largest leader_epoch of >> messages >> > > > whose >> > > > >> offset < the commit offset. If no message has been consumed since >> > > > consumer >> > > > >> initialization, the leader_epoch from seek(...) or >> > OffsetFetchResponse >> > > > >> should be used. The partition_epoch should be read from the last >> > > > >> FetchResponse corresponding to the given partition and commit >> > offset. >> > > ": >> > > > >> leader_epoch and partition_epoch are associated with an offset. >> So, >> > if >> > > > no >> > > > >> message is consumed, there is no offset and therefore there is no >> > need >> > > > to >> > > > >> read leader_epoch and partition_epoch. Also, the leader_epoch >> > > associated >> > > > >> with the offset should just come from the messages returned in >> the >> > > fetch >> > > > >> response. >> > > > >> >> > > > > >> > > > > I am thinking that, if user calls seek(..) and commitSync(...) >> > without >> > > > > consuming any messages, we should re-use the leader_epoch and >> > > > > partition_epoch provided by the seek(...) in the >> OffsetCommitRequest. >> > > And >> > > > > if messages have been successfully consumed, then leader_epoch >> will >> > > come >> > > > > from the messages returned in the fetch response. The condition >> > > "messages >> > > > > whose offset < the commit offset" is needed to take care of the >> log >> > > > > compacted topic which may have offset gap due to log cleaning. >> > > > > >> > > > > Did I miss something here? Or should I rephrase the paragraph to >> make >> > > it >> > > > > less confusing? >> > > > > >> > > > > >> > > > >> 64. Could you include the public methods in the OffsetEpoch >> class? >> > > > >> >> > > > > >> > > > > I mistakenly deleted the definition of OffsetEpoch class from the >> > KIP. >> > > I >> > > > > just added it back with the public methods. Could you take another >> > > look? >> > > > > >> > > > > >> > > > >> >> > > > >> Jun >> > > > >> >> > > > >> >> > > > >> On Thu, Jan 11, 2018 at 5:43 PM, Dong Lin <lindon...@gmail.com> >> > > wrote: >> > > > >> >> > > > >> > Hey Jun, >> > > > >> > >> > > > >> > Thanks much. I agree that we can not rely on committed offsets >> to >> > be >> > > > >> always >> > > > >> > deleted when we delete topic. So it is necessary to use a >> > > > per-partition >> > > > >> > epoch that does not change unless this partition is deleted. I >> > also >> > > > >> agree >> > > > >> > that it is very nice to be able to uniquely identify a message >> > with >> > > > >> > (offset, leader_epoch, partition_epoch) in face of potential >> topic >> > > > >> deletion >> > > > >> > and unclean leader election. >> > > > >> > >> > > > >> > I agree with all your comments. And I have updated the KIP >> based >> > on >> > > > our >> > > > >> > latest discussion. In addition, I added >> > > InvalidPartitionEpochException >> > > > >> > which will be thrown by consumer.poll() if the partition_epoch >> > > > >> associated >> > > > >> > with the partition, which can be given to consumer using >> > seek(...), >> > > is >> > > > >> > different from the partition_epoch in the FetchResponse. >> > > > >> > >> > > > >> > Can you take another look at the latest KIP? >> > > > >> > >> > > > >> > Thanks! >> > > > >> > Dong >> > > > >> > >> > > > >> > >> > > > >> > >> > > > >> > On Wed, Jan 10, 2018 at 2:24 PM, Jun Rao <j...@confluent.io> >> > wrote: >> > > > >> > >> > > > >> > > Hi, Dong, >> > > > >> > > >> > > > >> > > My replies are the following. >> > > > >> > > >> > > > >> > > 60. What you described could also work. The drawback is that >> we >> > > will >> > > > >> be >> > > > >> > > unnecessarily changing the partition epoch when a partition >> > hasn't >> > > > >> really >> > > > >> > > changed. I was imagining that the partition epoch will be >> stored >> > > in >> > > > >> > > /brokers/topics/[topic]/partitions/[partitionId], instead >> of at >> > > the >> > > > >> > topic >> > > > >> > > level. So, not sure if ZK size limit is an issue. >> > > > >> > > >> > > > >> > > 61, 62 and 65. To me, the offset + offset_epoch is a unique >> > > > identifier >> > > > >> > for >> > > > >> > > a message. So, if a message hasn't changed, the offset and >> the >> > > > >> associated >> > > > >> > > offset_epoch ideally should remain the same (it will be kind >> of >> > > > weird >> > > > >> if >> > > > >> > > two consumer apps save the offset on the same message, but >> the >> > > > >> > offset_epoch >> > > > >> > > are different). partition_epoch + leader_epoch give us that. >> > > > >> > global_epoch + >> > > > >> > > leader_epoch don't. If we use this approach, we can solve not >> > only >> > > > the >> > > > >> > > problem that you have identified, but also other problems >> when >> > > there >> > > > >> is >> > > > >> > > data loss or topic re-creation more reliably. For example, in >> > the >> > > > >> future, >> > > > >> > > if we include the partition_epoch and leader_epoch in the >> fetch >> > > > >> request, >> > > > >> > > the server can do a more reliable check of whether that >> offset >> > is >> > > > >> valid >> > > > >> > or >> > > > >> > > not. I am not sure that we can rely upon all external >> offsets to >> > > be >> > > > >> > removed >> > > > >> > > on topic deletion. For example, a topic may be deleted by an >> > admin >> > > > who >> > > > >> > may >> > > > >> > > not know all the applications. >> > > > >> > > >> > > > >> > > If we agree on the above, the second question is then how to >> > > > reliably >> > > > >> > > propagate the partition_epoch and the leader_epoch to the >> > consumer >> > > > >> when >> > > > >> > > there are leader or partition changes. The leader_epoch comes >> > from >> > > > the >> > > > >> > > message, which is reliable. So, I was suggesting that when we >> > > store >> > > > an >> > > > >> > > offset, we can just store the leader_epoch from the message >> set >> > > > >> > containing >> > > > >> > > that offset. Similarly, I was thinking that if the >> > partition_epoch >> > > > is >> > > > >> in >> > > > >> > > the fetch response, we can propagate partition_epoch reliably >> > > where >> > > > is >> > > > >> > > partition_epoch change. >> > > > >> > > >> > > > >> > > 63. My point is that once a leader is producing a message in >> the >> > > new >> > > > >> > > partition_epoch, ideally, we should associate the new offsets >> > with >> > > > the >> > > > >> > new >> > > > >> > > partition_epoch. Otherwise, the offset_epoch won't be the >> > correct >> > > > >> unique >> > > > >> > > identifier (useful for solving other problems mentioned >> above). >> > I >> > > > was >> > > > >> > > originally thinking that the leader will include the >> > > partition_epoch >> > > > >> in >> > > > >> > the >> > > > >> > > metadata cache in the fetch response. It's just that right >> now, >> > > > >> metadata >> > > > >> > > cache is updated on UpdateMetadataRequest, which typically >> > happens >> > > > >> after >> > > > >> > > the LeaderAndIsrRequest. Another approach is for the leader >> to >> > > cache >> > > > >> the >> > > > >> > > partition_epoch in the Partition object and return that >> (instead >> > > of >> > > > >> the >> > > > >> > one >> > > > >> > > in metadata cache) in the fetch response. >> > > > >> > > >> > > > >> > > 65. It seems to me that the global_epoch and the >> partition_epoch >> > > > have >> > > > >> > > different purposes. A partition_epoch has the benefit that it >> > (1) >> > > > can >> > > > >> be >> > > > >> > > used to form a unique identifier for a message and (2) can be >> > used >> > > > to >> > > > >> > > solve other >> > > > >> > > corner case problems in the future. I am not sure having >> just a >> > > > >> > > global_epoch can achieve these. global_epoch is useful to >> > > determine >> > > > >> which >> > > > >> > > version of the metadata is newer, especially with topic >> > deletion. >> > > > >> > > >> > > > >> > > Thanks, >> > > > >> > > >> > > > >> > > Jun >> > > > >> > > >> > > > >> > > On Tue, Jan 9, 2018 at 11:34 PM, Dong Lin < >> lindon...@gmail.com> >> > > > >> wrote: >> > > > >> > > >> > > > >> > > > Regarding the use of the global epoch in 65), it is very >> > similar >> > > > to >> > > > >> the >> > > > >> > > > proposal of the metadata_epoch we discussed earlier. The >> main >> > > > >> > difference >> > > > >> > > is >> > > > >> > > > that this epoch is incremented when we create/expand/delete >> > > topic >> > > > >> and >> > > > >> > > does >> > > > >> > > > not change when controller re-send metadata. >> > > > >> > > > >> > > > >> > > > I looked at our previous discussion. It seems that we >> prefer >> > > > >> > > > partition_epoch over the metadata_epoch because 1) we >> prefer >> > not >> > > > to >> > > > >> > have >> > > > >> > > an >> > > > >> > > > ever growing metadata_epoch and 2) we can reset offset >> better >> > > when >> > > > >> > topic >> > > > >> > > is >> > > > >> > > > re-created. The use of global topic_epoch avoids the >> drawback >> > of >> > > > an >> > > > >> > ever >> > > > >> > > > quickly ever growing metadata_epoch. Though the global >> epoch >> > > does >> > > > >> not >> > > > >> > > allow >> > > > >> > > > us to recognize the invalid offset committed before the >> topic >> > > > >> > > re-creation, >> > > > >> > > > we can probably just delete the offset when we delete a >> topic. >> > > > Thus >> > > > >> I >> > > > >> > am >> > > > >> > > > not very sure whether it is still worthwhile to have a >> > > > per-partition >> > > > >> > > > partition_epoch if the metadata already has the global >> epoch. >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > On Tue, Jan 9, 2018 at 6:58 PM, Dong Lin < >> lindon...@gmail.com >> > > >> > > > >> wrote: >> > > > >> > > > >> > > > >> > > > > Hey Jun, >> > > > >> > > > > >> > > > >> > > > > Thanks so much. These comments very useful. Please see >> below >> > > my >> > > > >> > > comments. >> > > > >> > > > > >> > > > >> > > > > On Mon, Jan 8, 2018 at 5:52 PM, Jun Rao < >> j...@confluent.io> >> > > > wrote: >> > > > >> > > > > >> > > > >> > > > >> Hi, Dong, >> > > > >> > > > >> >> > > > >> > > > >> Thanks for the updated KIP. A few more comments. >> > > > >> > > > >> >> > > > >> > > > >> 60. Perhaps having a partition epoch is more flexible >> since >> > > in >> > > > >> the >> > > > >> > > > future, >> > > > >> > > > >> we may support deleting a partition as well. >> > > > >> > > > >> >> > > > >> > > > > >> > > > >> > > > > Yeah I have considered this. I think we can probably >> still >> > > > support >> > > > >> > > > > deleting a partition by using the topic_epoch -- when >> > > partition >> > > > >> of a >> > > > >> > > > topic >> > > > >> > > > > is deleted or created, epoch of all partitions of this >> topic >> > > > will >> > > > >> be >> > > > >> > > > > incremented by 1. Therefore, if that partition is >> re-created >> > > > >> later, >> > > > >> > the >> > > > >> > > > > epoch of that partition will still be larger than its >> epoch >> > > > before >> > > > >> > the >> > > > >> > > > > deletion, which still allows the client to order the >> > metadata >> > > > for >> > > > >> the >> > > > >> > > > > purpose of this KIP. Does this sound reasonable? >> > > > >> > > > > >> > > > >> > > > > The advantage of using topic_epoch instead of >> > partition_epoch >> > > is >> > > > >> that >> > > > >> > > the >> > > > >> > > > > size of the /brokers/topics/[topic] znode and >> > request/response >> > > > >> size >> > > > >> > can >> > > > >> > > > be >> > > > >> > > > > smaller. We have a limit on the maximum size of znode >> > > (typically >> > > > >> > 1MB). >> > > > >> > > > Use >> > > > >> > > > > partition epoch can effectively reduce the number of >> > > partitions >> > > > >> that >> > > > >> > > can >> > > > >> > > > be >> > > > >> > > > > described by the /brokers/topics/[topic] znode. >> > > > >> > > > > >> > > > >> > > > > One use-case of partition_epoch for client to detect that >> > the >> > > > >> > committed >> > > > >> > > > > offset, either from kafka offset topic or from the >> external >> > > > store >> > > > >> is >> > > > >> > > > > invalid after partition deletion and re-creation. >> However, >> > it >> > > > >> seems >> > > > >> > > that >> > > > >> > > > we >> > > > >> > > > > can also address this use-case with other approaches. For >> > > > example, >> > > > >> > when >> > > > >> > > > > AdminClient deletes partitions, it can also delete the >> > > committed >> > > > >> > > offsets >> > > > >> > > > > for those partitions from the offset topic. If user >> stores >> > > > offset >> > > > >> > > > > externally, it might make sense for user to similarly >> remove >> > > > >> offsets >> > > > >> > of >> > > > >> > > > > related partitions after these partitions are deleted. >> So I >> > am >> > > > not >> > > > >> > sure >> > > > >> > > > > that we should use partition_epoch in this KIP. >> > > > >> > > > > >> > > > >> > > > > >> > > > >> > > > >> >> > > > >> > > > >> 61. It seems that the leader epoch returned in the >> > position() >> > > > >> call >> > > > >> > > > should >> > > > >> > > > >> the the leader epoch returned in the fetch response, not >> > the >> > > > one >> > > > >> in >> > > > >> > > the >> > > > >> > > > >> metadata cache of the client. >> > > > >> > > > > >> > > > >> > > > > >> > > > >> > > > > I think this is a good idea. Just to double check, this >> > change >> > > > >> does >> > > > >> > not >> > > > >> > > > > affect the correctness or performance of this KIP. But it >> > can >> > > be >> > > > >> > useful >> > > > >> > > > if >> > > > >> > > > > we want to use the leader_epoch to better handle the >> offset >> > > rest >> > > > >> in >> > > > >> > > case >> > > > >> > > > of >> > > > >> > > > > unclean leader election, which is listed in the future >> work. >> > > Is >> > > > >> this >> > > > >> > > > > understanding correct? >> > > > >> > > > > >> > > > >> > > > > I have updated the KIP to specify that the leader_epoch >> > > returned >> > > > >> by >> > > > >> > > > > position() should be the largest leader_epoch of those >> > already >> > > > >> > consumed >> > > > >> > > > > messages whose offset < position. If no message has been >> > > > consumed >> > > > >> > since >> > > > >> > > > > consumer initialization, the leader_epoch from seek() or >> > > > >> > > > > OffsetFetchResponse should be used. The offset included >> in >> > the >> > > > >> > > > > OffsetCommitRequest will also be determined in the >> similar >> > > > manner. >> > > > >> > > > > >> > > > >> > > > > >> > > > >> > > > >> >> > > > >> > > > >> 62. I am wondering if we should return the partition >> epoch >> > in >> > > > the >> > > > >> > > fetch >> > > > >> > > > >> response as well. In the current proposal, if a topic is >> > > > >> recreated >> > > > >> > and >> > > > >> > > > the >> > > > >> > > > >> new leader is on the same broker as the old one, there >> is >> > > > >> nothing to >> > > > >> > > > force >> > > > >> > > > >> the metadata refresh in the client. So, the client may >> > still >> > > > >> > associate >> > > > >> > > > the >> > > > >> > > > >> offset with the old partition epoch. >> > > > >> > > > >> >> > > > >> > > > > >> > > > >> > > > > Could you help me understand the problem if a client >> > > associates >> > > > >> old >> > > > >> > > > > partition_epoch (or the topic_epoch as of the current >> KIP) >> > > with >> > > > >> the >> > > > >> > > > offset? >> > > > >> > > > > The main purpose of the topic_epoch is to be able to drop >> > > > >> > leader_epoch >> > > > >> > > > to 0 >> > > > >> > > > > after a partition is deleted and re-created. I guess you >> may >> > > be >> > > > >> > > thinking >> > > > >> > > > > about using the partition_epoch to detect that the >> committed >> > > > >> offset >> > > > >> > is >> > > > >> > > > > invalid? In that case, I am wondering if the alternative >> > > > approach >> > > > >> > > > described >> > > > >> > > > > in 60) would be reasonable. >> > > > >> > > > > >> > > > >> > > > > >> > > > >> > > > >> >> > > > >> > > > >> 63. There is some subtle coordination between the >> > > > >> > LeaderAndIsrRequest >> > > > >> > > > and >> > > > >> > > > >> UpdateMetadataRequest. Currently, when a leader changes, >> > the >> > > > >> > > controller >> > > > >> > > > >> first sends the LeaderAndIsrRequest to the assigned >> > replicas >> > > > and >> > > > >> the >> > > > >> > > > >> UpdateMetadataRequest to every broker. So, there could >> be a >> > > > small >> > > > >> > > window >> > > > >> > > > >> when the leader already receives the new partition >> epoch in >> > > the >> > > > >> > > > >> LeaderAndIsrRequest, but the metadata cache in the >> broker >> > > > hasn't >> > > > >> > been >> > > > >> > > > >> updated with the latest partition epoch. Not sure what's >> > the >> > > > best >> > > > >> > way >> > > > >> > > to >> > > > >> > > > >> address this issue. Perhaps we can update the metadata >> > cache >> > > on >> > > > >> the >> > > > >> > > > broker >> > > > >> > > > >> with both LeaderAndIsrRequest and UpdateMetadataRequest. >> > The >> > > > >> > challenge >> > > > >> > > > is >> > > > >> > > > >> that the two have slightly different data. For example, >> > only >> > > > the >> > > > >> > > latter >> > > > >> > > > >> has >> > > > >> > > > >> all endpoints. >> > > > >> > > > >> >> > > > >> > > > > >> > > > >> > > > > I am not sure whether this is a problem. Could you >> explain a >> > > bit >> > > > >> more >> > > > >> > > > what >> > > > >> > > > > specific problem this small window can cause? >> > > > >> > > > > >> > > > >> > > > > Since client can fetch metadata from any broker in the >> > > cluster, >> > > > >> and >> > > > >> > > given >> > > > >> > > > > that different brokers receive request (e.g. >> > > LeaderAndIsrRequest >> > > > >> and >> > > > >> > > > > UpdateMetadataRequest) in arbitrary order, the metadata >> > > received >> > > > >> by >> > > > >> > > > client >> > > > >> > > > > can be in arbitrary order (either newer or older) >> compared >> > to >> > > > the >> > > > >> > > > broker's >> > > > >> > > > > leadership state even if a given broker receives >> > > > >> LeaderAndIsrRequest >> > > > >> > > and >> > > > >> > > > > UpdateMetadataRequest simultaneously. So I am not sure >> it is >> > > > >> useful >> > > > >> > to >> > > > >> > > > > update broker's cache with LeaderAndIsrRequest. >> > > > >> > > > > >> > > > >> > > > > >> > > > >> > > > >> 64. The enforcement of leader epoch in Offset commit: We >> > > allow >> > > > a >> > > > >> > > > consumer >> > > > >> > > > >> to set an arbitrary offset. So it's possible for >> offsets or >> > > > >> leader >> > > > >> > > epoch >> > > > >> > > > >> to >> > > > >> > > > >> go backwards. I am not sure if we could always enforce >> that >> > > the >> > > > >> > leader >> > > > >> > > > >> epoch only goes up on the broker. >> > > > >> > > > >> >> > > > >> > > > > >> > > > >> > > > > Sure. I have removed this check from the KIP. >> > > > >> > > > > >> > > > >> > > > > BTW, we can probably still ensure that the leader_epoch >> > always >> > > > >> > increase >> > > > >> > > > if >> > > > >> > > > > the leader_epoch used with offset commit is the >> > > max(leader_epoch >> > > > >> of >> > > > >> > the >> > > > >> > > > > message with offset = the committed offset - 1, the >> largest >> > > > known >> > > > >> > > > > leader_epoch from the metadata). But I don't have a good >> > > > use-case >> > > > >> for >> > > > >> > > > this >> > > > >> > > > > alternative definition. So I choose the keep the KIP >> simple >> > by >> > > > >> > > requiring >> > > > >> > > > > leader_epoch to always increase. >> > > > >> > > > > >> > > > >> > > > > >> > > > >> > > > >> 65. Good point on handling missing partition epoch due >> to >> > > topic >> > > > >> > > > deletion. >> > > > >> > > > >> Another potential way to address this is to additionally >> > > > >> propagate >> > > > >> > the >> > > > >> > > > >> global partition epoch to brokers and the clients. This >> > way, >> > > > >> when a >> > > > >> > > > >> partition epoch is missing, we can use the global >> partition >> > > > >> epoch to >> > > > >> > > > >> reason >> > > > >> > > > >> about which metadata is more recent. >> > > > >> > > > >> >> > > > >> > > > > >> > > > >> > > > > This is a great idea. The global epoch can be used to >> order >> > > the >> > > > >> > > metadata >> > > > >> > > > > and help us recognize the more recent metadata if a topic >> > (or >> > > > >> > > partition) >> > > > >> > > > is >> > > > >> > > > > deleted and re-created. >> > > > >> > > > > >> > > > >> > > > > Actually, it seems we only need to propagate the global >> > epoch >> > > to >> > > > >> > > brokers >> > > > >> > > > > and clients without propagating this epoch on a >> per-topic or >> > > > >> > > > per-partition >> > > > >> > > > > basic. Doing so would simply interface changes made this >> > KIP. >> > > > Does >> > > > >> > this >> > > > >> > > > > approach sound reasonable? >> > > > >> > > > > >> > > > >> > > > > >> > > > >> > > > >> 66. A client may also get an offset by time using the >> > > > >> > offsetForTimes() >> > > > >> > > > >> api. >> > > > >> > > > >> So, we probably want to include offsetInternalMetadata >> in >> > > > >> > > > >> OffsetAndTimestamp >> > > > >> > > > >> as well. >> > > > >> > > > >> >> > > > >> > > > > >> > > > >> > > > > You are right. This probably also requires us to change >> the >> > > > >> > > > > ListOffsetRequest as well. I will update the KIP after we >> > > agree >> > > > on >> > > > >> > the >> > > > >> > > > > solution for 65). >> > > > >> > > > > >> > > > >> > > > > >> > > > >> > > > >> >> > > > >> > > > >> 67. InteralMetadata can be a bit confusing with the >> > metadata >> > > > >> field >> > > > >> > > > already >> > > > >> > > > >> there. Perhaps we can just call it OffsetEpoch. It >> might be >> > > > >> useful >> > > > >> > to >> > > > >> > > > make >> > > > >> > > > >> OffsetEpoch printable at least for debugging purpose. >> Once >> > > you >> > > > do >> > > > >> > > that, >> > > > >> > > > we >> > > > >> > > > >> are already exposing the internal fields. So, not sure >> if >> > > it's >> > > > >> worth >> > > > >> > > > >> hiding >> > > > >> > > > >> them. If we do want to hide them, perhaps we can have >> sth >> > > like >> > > > >> the >> > > > >> > > > >> following. The binary encoding is probably more >> efficient >> > > than >> > > > >> JSON >> > > > >> > > for >> > > > >> > > > >> external storage. >> > > > >> > > > >> >> > > > >> > > > >> OffsetEpoch { >> > > > >> > > > >> static OffsetEpoch decode(byte[]); >> > > > >> > > > >> >> > > > >> > > > >> public byte[] encode(); >> > > > >> > > > >> >> > > > >> > > > >> public String toString(); >> > > > >> > > > >> } >> > > > >> > > > >> >> > > > >> > > > > >> > > > >> > > > > Thanks much. I like this solution. I have updated the KIP >> > > > >> > accordingly. >> > > > >> > > > > >> > > > >> > > > > >> > > > >> > > > > >> > > > >> > > > >> >> > > > >> > > > >> Jun >> > > > >> > > > >> >> > > > >> > > > >> On Mon, Jan 8, 2018 at 4:22 PM, Dong Lin < >> > > lindon...@gmail.com> >> > > > >> > wrote: >> > > > >> > > > >> >> > > > >> > > > >> > Hey Jason, >> > > > >> > > > >> > >> > > > >> > > > >> > Certainly. This sounds good. I have updated the KIP to >> > > > clarity >> > > > >> > that >> > > > >> > > > the >> > > > >> > > > >> > global epoch will be incremented by 1 each time a >> topic >> > is >> > > > >> > deleted. >> > > > >> > > > >> > >> > > > >> > > > >> > Thanks, >> > > > >> > > > >> > Dong >> > > > >> > > > >> > >> > > > >> > > > >> > On Mon, Jan 8, 2018 at 4:09 PM, Jason Gustafson < >> > > > >> > ja...@confluent.io >> > > > >> > > > >> > > > >> > > > >> > wrote: >> > > > >> > > > >> > >> > > > >> > > > >> > > Hi Dong, >> > > > >> > > > >> > > >> > > > >> > > > >> > > >> > > > >> > > > >> > > I think your approach will allow user to distinguish >> > > > between >> > > > >> the >> > > > >> > > > >> metadata >> > > > >> > > > >> > > > before and after the topic deletion. I also agree >> > that >> > > > this >> > > > >> > can >> > > > >> > > be >> > > > >> > > > >> > > > potentially be useful to user. I am just not very >> > sure >> > > > >> whether >> > > > >> > > we >> > > > >> > > > >> > already >> > > > >> > > > >> > > > have a good use-case to make the additional >> > complexity >> > > > >> > > worthwhile. >> > > > >> > > > >> It >> > > > >> > > > >> > > seems >> > > > >> > > > >> > > > that this feature is kind of independent of the >> main >> > > > >> problem >> > > > >> > of >> > > > >> > > > this >> > > > >> > > > >> > KIP. >> > > > >> > > > >> > > > Could we add this as a future work? >> > > > >> > > > >> > > >> > > > >> > > > >> > > >> > > > >> > > > >> > > Do you think it's fair if we bump the topic epoch on >> > > > deletion >> > > > >> > and >> > > > >> > > > >> leave >> > > > >> > > > >> > > propagation of the epoch for deleted topics for >> future >> > > > work? >> > > > >> I >> > > > >> > > don't >> > > > >> > > > >> > think >> > > > >> > > > >> > > this adds much complexity and it makes the behavior >> > > > >> consistent: >> > > > >> > > > every >> > > > >> > > > >> > topic >> > > > >> > > > >> > > mutation results in an epoch bump. >> > > > >> > > > >> > > >> > > > >> > > > >> > > Thanks, >> > > > >> > > > >> > > Jason >> > > > >> > > > >> > > >> > > > >> > > > >> > > On Mon, Jan 8, 2018 at 3:14 PM, Dong Lin < >> > > > >> lindon...@gmail.com> >> > > > >> > > > wrote: >> > > > >> > > > >> > > >> > > > >> > > > >> > > > Hey Ismael, >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > I guess we actually need user to see this field so >> > that >> > > > >> user >> > > > >> > can >> > > > >> > > > >> store >> > > > >> > > > >> > > this >> > > > >> > > > >> > > > value in the external store together with the >> offset. >> > > We >> > > > >> just >> > > > >> > > > prefer >> > > > >> > > > >> > the >> > > > >> > > > >> > > > value to be opaque to discourage most users from >> > > > >> interpreting >> > > > >> > > this >> > > > >> > > > >> > value. >> > > > >> > > > >> > > > One more advantage of using such an opaque field >> is >> > to >> > > be >> > > > >> able >> > > > >> > > to >> > > > >> > > > >> > evolve >> > > > >> > > > >> > > > the information (or schema) of this value without >> > > > changing >> > > > >> > > > consumer >> > > > >> > > > >> API >> > > > >> > > > >> > > in >> > > > >> > > > >> > > > the future. >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > I also thinking it is probably OK for user to be >> able >> > > to >> > > > >> > > interpret >> > > > >> > > > >> this >> > > > >> > > > >> > > > value, particularly for those advanced users. >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > Thanks, >> > > > >> > > > >> > > > Dong >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > On Mon, Jan 8, 2018 at 2:34 PM, Ismael Juma < >> > > > >> > ism...@juma.me.uk> >> > > > >> > > > >> wrote: >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > > On Fri, Jan 5, 2018 at 7:15 PM, Jason Gustafson >> < >> > > > >> > > > >> ja...@confluent.io> >> > > > >> > > > >> > > > > wrote: >> > > > >> > > > >> > > > > > >> > > > >> > > > >> > > > > > class OffsetAndMetadata { >> > > > >> > > > >> > > > > > long offset; >> > > > >> > > > >> > > > > > byte[] offsetMetadata; >> > > > >> > > > >> > > > > > String metadata; >> > > > >> > > > >> > > > > > } >> > > > >> > > > >> > > > > >> > > > >> > > > >> > > > > >> > > > >> > > > >> > > > > > Admittedly, the naming is a bit annoying, but >> we >> > > can >> > > > >> > > probably >> > > > >> > > > >> come >> > > > >> > > > >> > up >> > > > >> > > > >> > > > > with >> > > > >> > > > >> > > > > > something better. Internally the byte array >> would >> > > > have >> > > > >> a >> > > > >> > > > >> version. >> > > > >> > > > >> > If >> > > > >> > > > >> > > in >> > > > >> > > > >> > > > > the >> > > > >> > > > >> > > > > > future we have anything else we need to add, >> we >> > can >> > > > >> update >> > > > >> > > the >> > > > >> > > > >> > > version >> > > > >> > > > >> > > > > and >> > > > >> > > > >> > > > > > we wouldn't need any new APIs. >> > > > >> > > > >> > > > > > >> > > > >> > > > >> > > > > >> > > > >> > > > >> > > > > We can also add fields to a class in a >> compatible >> > > way. >> > > > >> So, >> > > > >> > it >> > > > >> > > > >> seems >> > > > >> > > > >> > to >> > > > >> > > > >> > > me >> > > > >> > > > >> > > > > that the main advantage of the byte array is >> that >> > > it's >> > > > >> > opaque >> > > > >> > > to >> > > > >> > > > >> the >> > > > >> > > > >> > > > user. >> > > > >> > > > >> > > > > Is that correct? If so, we could also add any >> > opaque >> > > > >> > metadata >> > > > >> > > > in a >> > > > >> > > > >> > > > subclass >> > > > >> > > > >> > > > > so that users don't even see it (unless they >> cast >> > it, >> > > > but >> > > > >> > then >> > > > >> > > > >> > they're >> > > > >> > > > >> > > on >> > > > >> > > > >> > > > > their own). >> > > > >> > > > >> > > > > >> > > > >> > > > >> > > > > Ismael >> > > > >> > > > >> > > > > >> > > > >> > > > >> > > > > The corresponding seek() and position() APIs >> might >> > > look >> > > > >> > > > something >> > > > >> > > > >> > like >> > > > >> > > > >> > > > > this: >> > > > >> > > > >> > > > > > >> > > > >> > > > >> > > > > > void seek(TopicPartition partition, long >> offset, >> > > > byte[] >> > > > >> > > > >> > > > offsetMetadata); >> > > > >> > > > >> > > > > > byte[] positionMetadata(TopicPartition >> > partition); >> > > > >> > > > >> > > > > > >> > > > >> > > > >> > > > > > What do you think? >> > > > >> > > > >> > > > > > >> > > > >> > > > >> > > > > > Thanks, >> > > > >> > > > >> > > > > > Jason >> > > > >> > > > >> > > > > > >> > > > >> > > > >> > > > > > On Thu, Jan 4, 2018 at 7:04 PM, Dong Lin < >> > > > >> > > lindon...@gmail.com >> > > > >> > > > > >> > > > >> > > > >> > > wrote: >> > > > >> > > > >> > > > > > >> > > > >> > > > >> > > > > > > Hey Jun, Jason, >> > > > >> > > > >> > > > > > > >> > > > >> > > > >> > > > > > > Thanks much for all the feedback. I have >> > updated >> > > > the >> > > > >> KIP >> > > > >> > > > >> based on >> > > > >> > > > >> > > the >> > > > >> > > > >> > > > > > > latest discussion. Can you help check >> whether >> > it >> > > > >> looks >> > > > >> > > good? >> > > > >> > > > >> > > > > > > >> > > > >> > > > >> > > > > > > Thanks, >> > > > >> > > > >> > > > > > > Dong >> > > > >> > > > >> > > > > > > >> > > > >> > > > >> > > > > > > On Thu, Jan 4, 2018 at 5:36 PM, Dong Lin < >> > > > >> > > > lindon...@gmail.com >> > > > >> > > > >> > >> > > > >> > > > >> > > > wrote: >> > > > >> > > > >> > > > > > > >> > > > >> > > > >> > > > > > > > Hey Jun, >> > > > >> > > > >> > > > > > > > >> > > > >> > > > >> > > > > > > > Hmm... thinking about this more, I am not >> > sure >> > > > that >> > > > >> > the >> > > > >> > > > >> > proposed >> > > > >> > > > >> > > > API >> > > > >> > > > >> > > > > is >> > > > >> > > > >> > > > > > > > sufficient. For users that store offset >> > > > >> externally, we >> > > > >> > > > >> probably >> > > > >> > > > >> > > > need >> > > > >> > > > >> > > > > > > extra >> > > > >> > > > >> > > > > > > > API to return the leader_epoch and >> > > > partition_epoch >> > > > >> for >> > > > >> > > all >> > > > >> > > > >> > > > partitions >> > > > >> > > > >> > > > > > > that >> > > > >> > > > >> > > > > > > > consumers are consuming. I suppose these >> > users >> > > > >> > currently >> > > > >> > > > use >> > > > >> > > > >> > > > > position() >> > > > >> > > > >> > > > > > > to >> > > > >> > > > >> > > > > > > > get the offset. Thus we probably need a >> new >> > > > method >> > > > >> > > > >> > > > > > positionWithEpoch(..) >> > > > >> > > > >> > > > > > > to >> > > > >> > > > >> > > > > > > > return <offset, partition_epoch, >> > leader_epoch>. >> > > > >> Does >> > > > >> > > this >> > > > >> > > > >> sound >> > > > >> > > > >> > > > > > > reasonable? >> > > > >> > > > >> > > > > > > > >> > > > >> > > > >> > > > > > > > Thanks, >> > > > >> > > > >> > > > > > > > Dong >> > > > >> > > > >> > > > > > > > >> > > > >> > > > >> > > > > > > > >> > > > >> > > > >> > > > > > > > On Thu, Jan 4, 2018 at 5:26 PM, Jun Rao < >> > > > >> > > j...@confluent.io >> > > > >> > > > > >> > > > >> > > > >> > > wrote: >> > > > >> > > > >> > > > > > > > >> > > > >> > > > >> > > > > > > >> Hi, Dong, >> > > > >> > > > >> > > > > > > >> >> > > > >> > > > >> > > > > > > >> Yes, that's what I am thinking. >> OffsetEpoch >> > > will >> > > > >> be >> > > > >> > > > >> composed >> > > > >> > > > >> > of >> > > > >> > > > >> > > > > > > >> (partition_epoch, >> > > > >> > > > >> > > > > > > >> leader_epoch). >> > > > >> > > > >> > > > > > > >> >> > > > >> > > > >> > > > > > > >> Thanks, >> > > > >> > > > >> > > > > > > >> >> > > > >> > > > >> > > > > > > >> Jun >> > > > >> > > > >> > > > > > > >> >> > > > >> > > > >> > > > > > > >> >> > > > >> > > > >> > > > > > > >> On Thu, Jan 4, 2018 at 4:22 PM, Dong Lin >> < >> > > > >> > > > >> lindon...@gmail.com >> > > > >> > > > >> > > >> > > > >> > > > >> > > > > wrote: >> > > > >> > > > >> > > > > > > >> >> > > > >> > > > >> > > > > > > >> > Hey Jun, >> > > > >> > > > >> > > > > > > >> > >> > > > >> > > > >> > > > > > > >> > Thanks much. I like the the new API >> that >> > you >> > > > >> > > proposed. >> > > > >> > > > I >> > > > >> > > > >> am >> > > > >> > > > >> > > not >> > > > >> > > > >> > > > > sure >> > > > >> > > > >> > > > > > > >> what >> > > > >> > > > >> > > > > > > >> > you exactly mean by offset_epoch. I >> > suppose >> > > > >> that we >> > > > >> > > can >> > > > >> > > > >> use >> > > > >> > > > >> > > the >> > > > >> > > > >> > > > > pair >> > > > >> > > > >> > > > > > > of >> > > > >> > > > >> > > > > > > >> > (partition_epoch, leader_epoch) as the >> > > > >> > offset_epoch, >> > > > >> > > > >> right? >> > > > >> > > > >> > > > > > > >> > >> > > > >> > > > >> > > > > > > >> > Thanks, >> > > > >> > > > >> > > > > > > >> > Dong >> > > > >> > > > >> > > > > > > >> > >> > > > >> > > > >> > > > > > > >> > On Thu, Jan 4, 2018 at 4:02 PM, Jun >> Rao < >> > > > >> > > > >> j...@confluent.io> >> > > > >> > > > >> > > > wrote: >> > > > >> > > > >> > > > > > > >> > >> > > > >> > > > >> > > > > > > >> > > Hi, Dong, >> > > > >> > > > >> > > > > > > >> > > >> > > > >> > > > >> > > > > > > >> > > Got it. The api that you proposed >> works. >> > > The >> > > > >> > > question >> > > > >> > > > >> is >> > > > >> > > > >> > > > whether >> > > > >> > > > >> > > > > > > >> that's >> > > > >> > > > >> > > > > > > >> > the >> > > > >> > > > >> > > > > > > >> > > api that we want to have in the long >> > term. >> > > > My >> > > > >> > > concern >> > > > >> > > > >> is >> > > > >> > > > >> > > that >> > > > >> > > > >> > > > > > while >> > > > >> > > > >> > > > > > > >> the >> > > > >> > > > >> > > > > > > >> > api >> > > > >> > > > >> > > > > > > >> > > change is simple, the new api seems >> > harder >> > > > to >> > > > >> > > explain >> > > > >> > > > >> and >> > > > >> > > > >> > > use. >> > > > >> > > > >> > > > > For >> > > > >> > > > >> > > > > > > >> > example, >> > > > >> > > > >> > > > > > > >> > > a consumer storing offsets externally >> > now >> > > > >> needs >> > > > >> > to >> > > > >> > > > call >> > > > >> > > > >> > > > > > > >> > > waitForMetadataUpdate() after calling >> > > > seek(). >> > > > >> > > > >> > > > > > > >> > > >> > > > >> > > > >> > > > > > > >> > > An alternative approach is to make >> the >> > > > >> following >> > > > >> > > > >> > compatible >> > > > >> > > > >> > > > api >> > > > >> > > > >> > > > > > > >> changes >> > > > >> > > > >> > > > > > > >> > in >> > > > >> > > > >> > > > > > > >> > > Consumer. >> > > > >> > > > >> > > > > > > >> > > * Add an additional OffsetEpoch >> field in >> > > > >> > > > >> > OffsetAndMetadata. >> > > > >> > > > >> > > > (no >> > > > >> > > > >> > > > > > need >> > > > >> > > > >> > > > > > > >> to >> > > > >> > > > >> > > > > > > >> > > change the CommitSync() api) >> > > > >> > > > >> > > > > > > >> > > * Add a new api seek(TopicPartition >> > > > partition, >> > > > >> > long >> > > > >> > > > >> > offset, >> > > > >> > > > >> > > > > > > >> OffsetEpoch >> > > > >> > > > >> > > > > > > >> > > offsetEpoch). We can potentially >> > deprecate >> > > > the >> > > > >> > old >> > > > >> > > > api >> > > > >> > > > >> > > > > > > >> > seek(TopicPartition >> > > > >> > > > >> > > > > > > >> > > partition, long offset) in the >> future. >> > > > >> > > > >> > > > > > > >> > > >> > > > >> > > > >> > > > > > > >> > > The alternative approach has similar >> > > amount >> > > > of >> > > > >> > api >> > > > >> > > > >> changes >> > > > >> > > > >> > > as >> > > > >> > > > >> > > > > > yours >> > > > >> > > > >> > > > > > > >> but >> > > > >> > > > >> > > > > > > >> > has >> > > > >> > > > >> > > > > > > >> > > the following benefits. >> > > > >> > > > >> > > > > > > >> > > 1. The api works in a similar way as >> how >> > > > >> offset >> > > > >> > > > >> management >> > > > >> > > > >> > > > works >> > > > >> > > > >> > > > > > now >> > > > >> > > > >> > > > > > > >> and >> > > > >> > > > >> > > > > > > >> > is >> > > > >> > > > >> > > > > > > >> > > probably what we want in the long >> term. >> > > > >> > > > >> > > > > > > >> > > 2. It can reset offsets better when >> > there >> > > is >> > > > >> data >> > > > >> > > > loss >> > > > >> > > > >> due >> > > > >> > > > >> > > to >> > > > >> > > > >> > > > > > > unclean >> > > > >> > > > >> > > > > > > >> > > leader election or correlated replica >> > > > failure. >> > > > >> > > > >> > > > > > > >> > > 3. It can reset offsets better when >> > topic >> > > is >> > > > >> > > > recreated. >> > > > >> > > > >> > > > > > > >> > > >> > > > >> > > > >> > > > > > > >> > > Thanks, >> > > > >> > > > >> > > > > > > >> > > >> > > > >> > > > >> > > > > > > >> > > Jun >> > > > >> > > > >> > > > > > > >> > > >> > > > >> > > > >> > > > > > > >> > > >> > > > >> > > > >> > > > > > > >> > > On Thu, Jan 4, 2018 at 2:05 PM, Dong >> > Lin < >> > > > >> > > > >> > > lindon...@gmail.com >> > > > >> > > > >> > > > > >> > > > >> > > > >> > > > > > > wrote: >> > > > >> > > > >> > > > > > > >> > > >> > > > >> > > > >> > > > > > > >> > > > Hey Jun, >> > > > >> > > > >> > > > > > > >> > > > >> > > > >> > > > >> > > > > > > >> > > > Yeah I agree that ideally we don't >> > want >> > > an >> > > > >> ever >> > > > >> > > > >> growing >> > > > >> > > > >> > > > global >> > > > >> > > > >> > > > > > > >> metadata >> > > > >> > > > >> > > > > > > >> > > > version. I just think it may be >> more >> > > > >> desirable >> > > > >> > to >> > > > >> > > > >> keep >> > > > >> > > > >> > the >> > > > >> > > > >> > > > > > > consumer >> > > > >> > > > >> > > > > > > >> API >> > > > >> > > > >> > > > > > > >> > > > simple. >> > > > >> > > > >> > > > > > > >> > > > >> > > > >> > > > >> > > > > > > >> > > > In my current proposal, metadata >> > version >> > > > >> > returned >> > > > >> > > > in >> > > > >> > > > >> the >> > > > >> > > > >> > > > fetch >> > > > >> > > > >> > > > > > > >> response >> > > > >> > > > >> > > > > > > >> > > > will be stored with the offset >> > together. >> > > > >> More >> > > > >> > > > >> > > specifically, >> > > > >> > > > >> > > > > the >> > > > >> > > > >> > > > > > > >> > > > metadata_epoch in the new offset >> topic >> > > > >> schema >> > > > >> > > will >> > > > >> > > > be >> > > > >> > > > >> > the >> > > > >> > > > >> > > > > > largest >> > > > >> > > > >> > > > > > > >> > > > metadata_epoch from all the >> > > > MetadataResponse >> > > > >> > and >> > > > >> > > > >> > > > FetchResponse >> > > > >> > > > >> > > > > > > ever >> > > > >> > > > >> > > > > > > >> > > > received by this consumer. >> > > > >> > > > >> > > > > > > >> > > > >> > > > >> > > > >> > > > > > > >> > > > We probably don't have to change >> the >> > > > >> consumer >> > > > >> > API >> > > > >> > > > for >> > > > >> > > > >> > > > > > > >> > > > commitSync(Map<TopicPartition, >> > > > >> > > OffsetAndMetadata>). >> > > > >> > > > >> If >> > > > >> > > > >> > > user >> > > > >> > > > >> > > > > > calls >> > > > >> > > > >> > > > > > > >> > > > commitSync(...) to commit offset 10 >> > for >> > > a >> > > > >> given >> > > > >> > > > >> > partition, >> > > > >> > > > >> > > > for >> > > > >> > > > >> > > > > > > most >> > > > >> > > > >> > > > > > > >> > > > use-cases, this consumer instance >> > should >> > > > >> have >> > > > >> > > > >> consumed >> > > > >> > > > >> > > > message >> > > > >> > > > >> > > > > > > with >> > > > >> > > > >> > > > > > > >> > > offset >> > > > >> > > > >> > > > > > > >> > > > 9 from this partition, in which >> case >> > the >> > > > >> > consumer >> > > > >> > > > can >> > > > >> > > > >> > > > remember >> > > > >> > > > >> > > > > > and >> > > > >> > > > >> > > > > > > >> use >> > > > >> > > > >> > > > > > > >> > > the >> > > > >> > > > >> > > > > > > >> > > > metadata_epoch from the >> corresponding >> > > > >> > > FetchResponse >> > > > >> > > > >> when >> > > > >> > > > >> > > > > > > committing >> > > > >> > > > >> > > > > > > >> > > offset. >> > > > >> > > > >> > > > > > > >> > > > If user calls commitSync(..) to >> commit >> > > > >> offset >> > > > >> > 10 >> > > > >> > > > for >> > > > >> > > > >> a >> > > > >> > > > >> > > given >> > > > >> > > > >> > > > > > > >> partition >> > > > >> > > > >> > > > > > > >> > > > without having consumed the message >> > with >> > > > >> > offset 9 >> > > > >> > > > >> using >> > > > >> > > > >> > > this >> > > > >> > > > >> > > > > > > >> consumer >> > > > >> > > > >> > > > > > > >> > > > instance, this is probably an >> advanced >> > > > >> > use-case. >> > > > >> > > In >> > > > >> > > > >> this >> > > > >> > > > >> > > > case >> > > > >> > > > >> > > > > > the >> > > > >> > > > >> > > > > > > >> > > advanced >> > > > >> > > > >> > > > > > > >> > > > user can retrieve the >> metadata_epoch >> > > using >> > > > >> the >> > > > >> > > > newly >> > > > >> > > > >> > added >> > > > >> > > > >> > > > > > > >> > > metadataEpoch() >> > > > >> > > > >> > > > > > > >> > > > API after it fetches the message >> with >> > > > >> offset 9 >> > > > >> > > > >> (probably >> > > > >> > > > >> > > > from >> > > > >> > > > >> > > > > > > >> another >> > > > >> > > > >> > > > > > > >> > > > consumer instance) and encode this >> > > > >> > metadata_epoch >> > > > >> > > > in >> > > > >> > > > >> the >> > > > >> > > > >> > > > > > > >> > > > string OffsetAndMetadata.metadata. >> Do >> > > you >> > > > >> think >> > > > >> > > > this >> > > > >> > > > >> > > > solution >> > > > >> > > > >> > > > > > > would >> > > > >> > > > >> > > > > > > >> > work? >> > > > >> > > > >> > > > > > > >> > > > >> > > > >> > > > >> > > > > > > >> > > > By "not sure that I fully >> understand >> > > your >> > > > >> > latest >> > > > >> > > > >> > > > suggestion", >> > > > >> > > > >> > > > > > are >> > > > >> > > > >> > > > > > > >> you >> > > > >> > > > >> > > > > > > >> > > > referring to solution related to >> > unclean >> > > > >> leader >> > > > >> > > > >> election >> > > > >> > > > >> > > > using >> > > > >> > > > >> > > > > > > >> > > leader_epoch >> > > > >> > > > >> > > > > > > >> > > > in my previous email? >> > > > >> > > > >> > > > > > > >> > > > >> > > > >> > > > >> > > > > > > >> > > > Thanks, >> > > > >> > > > >> > > > > > > >> > > > Dong >> > > > >> > > > >> > > > > > > >> > > > >> > > > >> > > > >> > > > > > > >> > > > On Thu, Jan 4, 2018 at 1:33 PM, Jun >> > Rao >> > > < >> > > > >> > > > >> > j...@confluent.io >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > > > wrote: >> > > > >> > > > >> > > > > > > >> > > > >> > > > >> > > > >> > > > > > > >> > > > > Hi, Dong, >> > > > >> > > > >> > > > > > > >> > > > > >> > > > >> > > > >> > > > > > > >> > > > > Not sure that I fully understand >> > your >> > > > >> latest >> > > > >> > > > >> > suggestion. >> > > > >> > > > >> > > > > > > >> Returning an >> > > > >> > > > >> > > > > > > >> > > > ever >> > > > >> > > > >> > > > > > > >> > > > > growing global metadata version >> > itself >> > > > is >> > > > >> no >> > > > >> > > > ideal, >> > > > >> > > > >> > but >> > > > >> > > > >> > > is >> > > > >> > > > >> > > > > > fine. >> > > > >> > > > >> > > > > > > >> My >> > > > >> > > > >> > > > > > > >> > > > > question is whether the metadata >> > > version >> > > > >> > > returned >> > > > >> > > > >> in >> > > > >> > > > >> > the >> > > > >> > > > >> > > > > fetch >> > > > >> > > > >> > > > > > > >> > response >> > > > >> > > > >> > > > > > > >> > > > > needs to be stored with the >> offset >> > > > >> together >> > > > >> > if >> > > > >> > > > >> offsets >> > > > >> > > > >> > > are >> > > > >> > > > >> > > > > > > stored >> > > > >> > > > >> > > > > > > >> > > > > externally. If so, we also have >> to >> > > > change >> > > > >> the >> > > > >> > > > >> consumer >> > > > >> > > > >> > > API >> > > > >> > > > >> > > > > for >> > > > >> > > > >> > > > > > > >> > > > commitSync() >> > > > >> > > > >> > > > > > > >> > > > > and need to worry about >> > compatibility. >> > > > If >> > > > >> we >> > > > >> > > > don't >> > > > >> > > > >> > store >> > > > >> > > > >> > > > the >> > > > >> > > > >> > > > > > > >> metadata >> > > > >> > > > >> > > > > > > >> > > > > version together with the offset, >> > on a >> > > > >> > consumer >> > > > >> > > > >> > restart, >> > > > >> > > > >> > > > > it's >> > > > >> > > > >> > > > > > > not >> > > > >> > > > >> > > > > > > >> > clear >> > > > >> > > > >> > > > > > > >> > > > how >> > > > >> > > > >> > > > > > > >> > > > > we can ensure the metadata in the >> > > > >> consumer is >> > > > >> > > > high >> > > > >> > > > >> > > enough >> > > > >> > > > >> > > > > > since >> > > > >> > > > >> > > > > > > >> there >> > > > >> > > > >> > > > > > > >> > > is >> > > > >> > > > >> > > > > > > >> > > > no >> > > > >> > > > >> > > > > > > >> > > > > metadata version to compare with. >> > > > >> > > > >> > > > > > > >> > > > > >> > > > >> > > > >> > > > > > > >> > > > > Thanks, >> > > > >> > > > >> > > > > > > >> > > > > >> > > > >> > > > >> > > > > > > >> > > > > Jun >> > > > >> > > > >> > > > > > > >> > > > > >> > > > >> > > > >> > > > > > > >> > > > > >> > > > >> > > > >> > > > > > > >> > > > > On Wed, Jan 3, 2018 at 6:43 PM, >> Dong >> > > > Lin < >> > > > >> > > > >> > > > > lindon...@gmail.com >> > > > >> > > > >> > > > > > > >> > > > >> > > > >> > > > > > > >> > wrote: >> > > > >> > > > >> > > > > > > >> > > > > >> > > > >> > > > >> > > > > > > >> > > > > > Hey Jun, >> > > > >> > > > >> > > > > > > >> > > > > > >> > > > >> > > > >> > > > > > > >> > > > > > Thanks much for the >> explanation. >> > > > >> > > > >> > > > > > > >> > > > > > >> > > > >> > > > >> > > > > > > >> > > > > > I understand the advantage of >> > > > >> > partition_epoch >> > > > >> > > > >> over >> > > > >> > > > >> > > > > > > >> metadata_epoch. >> > > > >> > > > >> > > > > > > >> > My >> > > > >> > > > >> > > > > > > >> > > > > > current concern is that the >> use of >> > > > >> > > leader_epoch >> > > > >> > > > >> and >> > > > >> > > > >> > > the >> > > > >> > > > >> > > > > > > >> > > partition_epoch >> > > > >> > > > >> > > > > > > >> > > > > > requires us considerable >> change to >> > > > >> > consumer's >> > > > >> > > > >> public >> > > > >> > > > >> > > API >> > > > >> > > > >> > > > > to >> > > > >> > > > >> > > > > > > take >> > > > >> > > > >> > > > > > > >> > care >> > > > >> > > > >> > > > > > > >> > > > of >> > > > >> > > > >> > > > > > > >> > > > > > the case where user stores >> offset >> > > > >> > externally. >> > > > >> > > > For >> > > > >> > > > >> > > > example, >> > > > >> > > > >> > > > > > > >> > > *consumer*. >> > > > >> > > > >> > > > > > > >> > > > > > *commitSync*(..) would have to >> > take >> > > a >> > > > >> map >> > > > >> > > whose >> > > > >> > > > >> > value >> > > > >> > > > >> > > is >> > > > >> > > > >> > > > > > > >> <offset, >> > > > >> > > > >> > > > > > > >> > > > > metadata, >> > > > >> > > > >> > > > > > > >> > > > > > leader epoch, partition epoch>. >> > > > >> > > > >> > *consumer*.*seek*(...) >> > > > >> > > > >> > > > > would >> > > > >> > > > >> > > > > > > >> also >> > > > >> > > > >> > > > > > > >> > > need >> > > > >> > > > >> > > > > > > >> > > > > > leader_epoch and >> partition_epoch >> > as >> > > > >> > > parameter. >> > > > >> > > > >> > > > Technically >> > > > >> > > > >> > > > > > we >> > > > >> > > > >> > > > > > > >> can >> > > > >> > > > >> > > > > > > >> > > > > probably >> > > > >> > > > >> > > > > > > >> > > > > > still make it work in a >> backward >> > > > >> compatible >> > > > >> > > > >> manner >> > > > >> > > > >> > > after >> > > > >> > > > >> > > > > > > careful >> > > > >> > > > >> > > > > > > >> > > design >> > > > >> > > > >> > > > > > > >> > > > > and >> > > > >> > > > >> > > > > > > >> > > > > > discussion. But these changes >> can >> > > make >> > > > >> the >> > > > >> > > > >> > consumer's >> > > > >> > > > >> > > > > > > interface >> > > > >> > > > >> > > > > > > >> > > > > > unnecessarily complex for more >> > users >> > > > >> who do >> > > > >> > > not >> > > > >> > > > >> > store >> > > > >> > > > >> > > > > offset >> > > > >> > > > >> > > > > > > >> > > > externally. >> > > > >> > > > >> > > > > > > >> > > > > > >> > > > >> > > > >> > > > > > > >> > > > > > After thinking more about it, >> we >> > can >> > > > >> > address >> > > > >> > > > all >> > > > >> > > > >> > > > problems >> > > > >> > > > >> > > > > > > >> discussed >> > > > >> > > > >> > > > > > > >> > > by >> > > > >> > > > >> > > > > > > >> > > > > only >> > > > >> > > > >> > > > > > > >> > > > > > using the metadata_epoch >> without >> > > > >> > introducing >> > > > >> > > > >> > > > leader_epoch >> > > > >> > > > >> > > > > or >> > > > >> > > > >> > > > > > > the >> > > > >> > > > >> > > > > > > >> > > > > > partition_epoch. The current >> KIP >> > > > >> describes >> > > > >> > > the >> > > > >> > > > >> > changes >> > > > >> > > > >> > > > to >> > > > >> > > > >> > > > > > the >> > > > >> > > > >> > > > > > > >> > > consumer >> > > > >> > > > >> > > > > > > >> > > > > API >> > > > >> > > > >> > > > > > > >> > > > > > and how the new API can be >> used if >> > > > user >> > > > >> > > stores >> > > > >> > > > >> > offset >> > > > >> > > > >> > > > > > > >> externally. >> > > > >> > > > >> > > > > > > >> > In >> > > > >> > > > >> > > > > > > >> > > > > order >> > > > >> > > > >> > > > > > > >> > > > > > to address the scenario you >> > > described >> > > > >> > > earlier, >> > > > >> > > > we >> > > > >> > > > >> > can >> > > > >> > > > >> > > > > > include >> > > > >> > > > >> > > > > > > >> > > > > > metadata_epoch in the >> > FetchResponse >> > > > and >> > > > >> the >> > > > >> > > > >> > > > > > > LeaderAndIsrRequest. >> > > > >> > > > >> > > > > > > >> > > > Consumer >> > > > >> > > > >> > > > > > > >> > > > > > remembers the largest >> > metadata_epoch >> > > > >> from >> > > > >> > all >> > > > >> > > > the >> > > > >> > > > >> > > > > > > FetchResponse >> > > > >> > > > >> > > > > > > >> it >> > > > >> > > > >> > > > > > > >> > > has >> > > > >> > > > >> > > > > > > >> > > > > > received. The metadata_epoch >> > > committed >> > > > >> with >> > > > >> > > the >> > > > >> > > > >> > > offset, >> > > > >> > > > >> > > > > > either >> > > > >> > > > >> > > > > > > >> > within >> > > > >> > > > >> > > > > > > >> > > > or >> > > > >> > > > >> > > > > > > >> > > > > > outside Kafka, should be the >> > largest >> > > > >> > > > >> metadata_epoch >> > > > >> > > > >> > > > across >> > > > >> > > > >> > > > > > all >> > > > >> > > > >> > > > > > > >> > > > > > FetchResponse and >> MetadataResponse >> > > > ever >> > > > >> > > > received >> > > > >> > > > >> by >> > > > >> > > > >> > > this >> > > > >> > > > >> > > > > > > >> consumer. >> > > > >> > > > >> > > > > > > >> > > > > > >> > > > >> > > > >> > > > > > > >> > > > > > The drawback of using only the >> > > > >> > metadata_epoch >> > > > >> > > > is >> > > > >> > > > >> > that >> > > > >> > > > >> > > we >> > > > >> > > > >> > > > > can >> > > > >> > > > >> > > > > > > not >> > > > >> > > > >> > > > > > > >> > > always >> > > > >> > > > >> > > > > > > >> > > > > do >> > > > >> > > > >> > > > > > > >> > > > > > the smart offset reset in case >> of >> > > > >> unclean >> > > > >> > > > leader >> > > > >> > > > >> > > > election >> > > > >> > > > >> > > > > > > which >> > > > >> > > > >> > > > > > > >> you >> > > > >> > > > >> > > > > > > >> > > > > > mentioned earlier. But in most >> > case, >> > > > >> > unclean >> > > > >> > > > >> leader >> > > > >> > > > >> > > > > election >> > > > >> > > > >> > > > > > > >> > probably >> > > > >> > > > >> > > > > > > >> > > > > > happens when consumer is not >> > > > >> > > > >> rebalancing/restarting. >> > > > >> > > > >> > > In >> > > > >> > > > >> > > > > > these >> > > > >> > > > >> > > > > > > >> > cases, >> > > > >> > > > >> > > > > > > >> > > > > either >> > > > >> > > > >> > > > > > > >> > > > > > consumer is not directly >> affected >> > by >> > > > >> > unclean >> > > > >> > > > >> leader >> > > > >> > > > >> > > > > election >> > > > >> > > > >> > > > > > > >> since >> > > > >> > > > >> > > > > > > >> > it >> > > > >> > > > >> > > > > > > >> > > > is >> > > > >> > > > >> > > > > > > >> > > > > > not consuming from the end of >> the >> > > log, >> > > > >> or >> > > > >> > > > >> consumer >> > > > >> > > > >> > can >> > > > >> > > > >> > > > > > derive >> > > > >> > > > >> > > > > > > >> the >> > > > >> > > > >> > > > > > > >> > > > > > leader_epoch from the most >> recent >> > > > >> message >> > > > >> > > > >> received >> > > > >> > > > >> > > > before >> > > > >> > > > >> > > > > it >> > > > >> > > > >> > > > > > > >> sees >> > > > >> > > > >> > > > > > > >> > > > > > OffsetOutOfRangeException. So >> I am >> > > not >> > > > >> sure >> > > > >> > > it >> > > > >> > > > is >> > > > >> > > > >> > > worth >> > > > >> > > > >> > > > > > adding >> > > > >> > > > >> > > > > > > >> the >> > > > >> > > > >> > > > > > > >> > > > > > leader_epoch to consumer API to >> > > > address >> > > > >> the >> > > > >> > > > >> > remaining >> > > > >> > > > >> > > > > corner >> > > > >> > > > >> > > > > > > >> case. >> > > > >> > > > >> > > > > > > >> > > What >> > > > >> > > > >> > > > > > > >> > > > > do >> > > > >> > > > >> > > > > > > >> > > > > > you think? >> > > > >> > > > >> > > > > > > >> > > > > > >> > > > >> > > > >> > > > > > > >> > > > > > Thanks, >> > > > >> > > > >> > > > > > > >> > > > > > Dong >> > > > >> > > > >> > > > > > > >> > > > > > >> > > > >> > > > >> > > > > > > >> > > > > > >> > > > >> > > > >> > > > > > > >> > > > > > >> > > > >> > > > >> > > > > > > >> > > > > > On Tue, Jan 2, 2018 at 6:28 PM, >> > Jun >> > > > Rao >> > > > >> < >> > > > >> > > > >> > > > j...@confluent.io >> > > > >> > > > >> > > > > > >> > > > >> > > > >> > > > > > > >> wrote: >> > > > >> > > > >> > > > > > > >> > > > > > >> > > > >> > > > >> > > > > > > >> > > > > > > Hi, Dong, >> > > > >> > > > >> > > > > > > >> > > > > > > >> > > > >> > > > >> > > > > > > >> > > > > > > Thanks for the reply. >> > > > >> > > > >> > > > > > > >> > > > > > > >> > > > >> > > > >> > > > > > > >> > > > > > > To solve the topic recreation >> > > issue, >> > > > >> we >> > > > >> > > could >> > > > >> > > > >> use >> > > > >> > > > >> > > > > either a >> > > > >> > > > >> > > > > > > >> global >> > > > >> > > > >> > > > > > > >> > > > > > metadata >> > > > >> > > > >> > > > > > > >> > > > > > > version or a partition level >> > > epoch. >> > > > >> But >> > > > >> > > > either >> > > > >> > > > >> one >> > > > >> > > > >> > > > will >> > > > >> > > > >> > > > > > be a >> > > > >> > > > >> > > > > > > >> new >> > > > >> > > > >> > > > > > > >> > > > > concept, >> > > > >> > > > >> > > > > > > >> > > > > > > right? To me, the latter >> seems >> > > more >> > > > >> > > natural. >> > > > >> > > > It >> > > > >> > > > >> > also >> > > > >> > > > >> > > > > makes >> > > > >> > > > >> > > > > > > it >> > > > >> > > > >> > > > > > > >> > > easier >> > > > >> > > > >> > > > > > > >> > > > to >> > > > >> > > > >> > > > > > > >> > > > > > > detect if a consumer's >> offset is >> > > > still >> > > > >> > > valid >> > > > >> > > > >> > after a >> > > > >> > > > >> > > > > topic >> > > > >> > > > >> > > > > > > is >> > > > >> > > > >> > > > > > > >> > > > > recreated. >> > > > >> > > > >> > > > > > > >> > > > > > As >> > > > >> > > > >> > > > > > > >> > > > > > > you pointed out, we don't >> need >> > to >> > > > >> store >> > > > >> > the >> > > > >> > > > >> > > partition >> > > > >> > > > >> > > > > > epoch >> > > > >> > > > >> > > > > > > in >> > > > >> > > > >> > > > > > > >> > the >> > > > >> > > > >> > > > > > > >> > > > > > message. >> > > > >> > > > >> > > > > > > >> > > > > > > The following is what I am >> > > thinking. >> > > > >> > When a >> > > > >> > > > >> > > partition >> > > > >> > > > >> > > > is >> > > > >> > > > >> > > > > > > >> created, >> > > > >> > > > >> > > > > > > >> > > we >> > > > >> > > > >> > > > > > > >> > > > > can >> > > > >> > > > >> > > > > > > >> > > > > > > assign a partition epoch >> from an >> > > > >> > > > >> ever-increasing >> > > > >> > > > >> > > > global >> > > > >> > > > >> > > > > > > >> counter >> > > > >> > > > >> > > > > > > >> > and >> > > > >> > > > >> > > > > > > >> > > > > store >> > > > >> > > > >> > > > > > > >> > > > > > > it in >> /brokers/topics/[topic]/ >> > > > >> > > > >> > > > partitions/[partitionId] >> > > > >> > > > >> > > > > in >> > > > >> > > > >> > > > > > > ZK. >> > > > >> > > > >> > > > > > > >> > The >> > > > >> > > > >> > > > > > > >> > > > > > > partition >> > > > >> > > > >> > > > > > > >> > > > > > > epoch is propagated to every >> > > broker. >> > > > >> The >> > > > >> > > > >> consumer >> > > > >> > > > >> > > will >> > > > >> > > > >> > > > > be >> > > > >> > > > >> > > > > > > >> > tracking >> > > > >> > > > >> > > > > > > >> > > a >> > > > >> > > > >> > > > > > > >> > > > > > tuple >> > > > >> > > > >> > > > > > > >> > > > > > > of <offset, leader epoch, >> > > partition >> > > > >> > epoch> >> > > > >> > > > for >> > > > >> > > > >> > > > offsets. >> > > > >> > > > >> > > > > > If a >> > > > >> > > > >> > > > > > > >> > topic >> > > > >> > > > >> > > > > > > >> > > is >> > > > >> > > > >> > > > > > > >> > > > > > > recreated, it's possible >> that a >> > > > >> > consumer's >> > > > >> > > > >> offset >> > > > >> > > > >> > > and >> > > > >> > > > >> > > > > > leader >> > > > >> > > > >> > > > > > > >> > epoch >> > > > >> > > > >> > > > > > > >> > > > > still >> > > > >> > > > >> > > > > > > >> > > > > > > match that in the broker, but >> > > > >> partition >> > > > >> > > epoch >> > > > >> > > > >> > won't >> > > > >> > > > >> > > > be. >> > > > >> > > > >> > > > > In >> > > > >> > > > >> > > > > > > >> this >> > > > >> > > > >> > > > > > > >> > > case, >> > > > >> > > > >> > > > > > > >> > > > > we >> > > > >> > > > >> > > > > > > >> > > > > > > can potentially still treat >> the >> > > > >> > consumer's >> > > > >> > > > >> offset >> > > > >> > > > >> > as >> > > > >> > > > >> > > > out >> > > > >> > > > >> > > > > > of >> > > > >> > > > >> > > > > > > >> range >> > > > >> > > > >> > > > > > > >> > > and >> > > > >> > > > >> > > > > > > >> > > > > > reset >> > > > >> > > > >> > > > > > > >> > > > > > > the offset based on the >> offset >> > > reset >> > > > >> > policy >> > > > >> > > > in >> > > > >> > > > >> the >> > > > >> > > > >> > > > > > consumer. >> > > > >> > > > >> > > > > > > >> This >> > > > >> > > > >> > > > > > > >> > > > seems >> > > > >> > > > >> > > > > > > >> > > > > > > harder to do with a global >> > > metadata >> > > > >> > > version. >> > > > >> > > > >> > > > > > > >> > > > > > > >> > > > >> > > > >> > > > > > > >> > > > > > > Jun >> > > > >> > > > >> > > > > > > >> > > > > > > >> > > > >> > > > >> > > > > > > >> > > > > > > >> > > > >> > > > >> > > > > > > >> > > > > > > >> > > > >> > > > >> > > > > > > >> > > > > > > On Mon, Dec 25, 2017 at 6:56 >> AM, >> > > > Dong >> > > > >> > Lin < >> > > > >> > > > >> > > > > > > >> lindon...@gmail.com> >> > > > >> > > > >> > > > > > > >> > > > wrote: >> > > > >> > > > >> > > > > > > >> > > > > > > >> > > > >> > > > >> > > > > > > >> > > > > > > > Hey Jun, >> > > > >> > > > >> > > > > > > >> > > > > > > > >> > > > >> > > > >> > > > > > > >> > > > > > > > This is a very good >> example. >> > > After >> > > > >> > > thinking >> > > > >> > > > >> > > through >> > > > >> > > > >> > > > > this >> > > > >> > > > >> > > > > > > in >> > > > >> > > > >> > > > > > > >> > > > detail, I >> > > > >> > > > >> > > > > > > >> > > > > > > agree >> > > > >> > > > >> > > > > > > >> > > > > > > > that we need to commit >> offset >> > > with >> > > > >> > leader >> > > > >> > > > >> epoch >> > > > >> > > > >> > in >> > > > >> > > > >> > > > > order >> > > > >> > > > >> > > > > > > to >> > > > >> > > > >> > > > > > > >> > > address >> > > > >> > > > >> > > > > > > >> > > > > > this >> > > > >> > > > >> > > > > > > >> > > > > > > > example. >> > > > >> > > > >> > > > > > > >> > > > > > > > >> > > > >> > > > >> > > > > > > >> > > > > > > > I think the remaining >> question >> > > is >> > > > >> how >> > > > >> > to >> > > > >> > > > >> address >> > > > >> > > > >> > > the >> > > > >> > > > >> > > > > > > >> scenario >> > > > >> > > > >> > > > > > > >> > > that >> > > > >> > > > >> > > > > > > >> > > > > the >> > > > >> > > > >> > > > > > > >> > > > > > > > topic is deleted and >> > re-created. >> > > > One >> > > > >> > > > possible >> > > > >> > > > >> > > > solution >> > > > >> > > > >> > > > > > is >> > > > >> > > > >> > > > > > > to >> > > > >> > > > >> > > > > > > >> > > commit >> > > > >> > > > >> > > > > > > >> > > > > > > offset >> > > > >> > > > >> > > > > > > >> > > > > > > > with both the leader epoch >> and >> > > the >> > > > >> > > metadata >> > > > >> > > > >> > > version. >> > > > >> > > > >> > > > > The >> > > > >> > > > >> > > > > > > >> logic >> > > > >> > > > >> > > > > > > >> > > and >> > > > >> > > > >> > > > > > > >> > > > > the >> > > > >> > > > >> > > > > > > >> > > > > > > > implementation of this >> > solution >> > > > does >> > > > >> > not >> > > > >> > > > >> > require a >> > > > >> > > > >> > > > new >> > > > >> > > > >> > > > > > > >> concept >> > > > >> > > > >> > > > > > > >> > > > (e.g. >> > > > >> > > > >> > > > > > > >> > > > > > > > partition epoch) and it >> does >> > not >> > > > >> > require >> > > > >> > > > any >> > > > >> > > > >> > > change >> > > > >> > > > >> > > > to >> > > > >> > > > >> > > > > > the >> > > > >> > > > >> > > > > > > >> > > message >> > > > >> > > > >> > > > > > > >> > > > > > format >> > > > >> > > > >> > > > > > > >> > > > > > > > or leader epoch. It also >> > allows >> > > us >> > > > >> to >> > > > >> > > order >> > > > >> > > > >> the >> > > > >> > > > >> > > > > metadata >> > > > >> > > > >> > > > > > > in >> > > > >> > > > >> > > > > > > >> a >> > > > >> > > > >> > > > > > > >> > > > > > > > straightforward manner >> which >> > may >> > > > be >> > > > >> > > useful >> > > > >> > > > in >> > > > >> > > > >> > the >> > > > >> > > > >> > > > > > future. >> > > > >> > > > >> > > > > > > >> So it >> > > > >> > > > >> > > > > > > >> > > may >> > > > >> > > > >> > > > > > > >> > > > > be >> > > > >> > > > >> > > > > > > >> > > > > > a >> > > > >> > > > >> > > > > > > >> > > > > > > > better solution than >> > generating >> > > a >> > > > >> > random >> > > > >> > > > >> > partition >> > > > >> > > > >> > > > > epoch >> > > > >> > > > >> > > > > > > >> every >> > > > >> > > > >> > > > > > > >> > > time >> > > > >> > > > >> > > > > > > >> > > > > we >> > > > >> > > > >> > > > > > > >> > > > > > > > create a partition. Does >> this >> > > > sound >> > > > >> > > > >> reasonable? >> > > > >> > > > >> > > > > > > >> > > > > > > > >> > > > >> > > > >> > > > > > > >> > > > > > > > Previously one concern with >> > > using >> > > > >> the >> > > > >> > > > >> metadata >> > > > >> > > > >> > > > version >> > > > >> > > > >> > > > > > is >> > > > >> > > > >> > > > > > > >> that >> > > > >> > > > >> > > > > > > >> > > > > consumer >> > > > >> > > > >> > > > > > > >> > > > > > > > will be forced to refresh >> > > metadata >> > > > >> even >> > > > >> > > if >> > > > >> > > > >> > > metadata >> > > > >> > > > >> > > > > > > version >> > > > >> > > > >> > > > > > > >> is >> > > > >> > > > >> > > > > > > >> > > > > > increased >> > > > >> > > > >> > > > > > > >> > > > > > > > due to topics that the >> > consumer >> > > is >> > > > >> not >> > > > >> > > > >> > interested >> > > > >> > > > >> > > > in. >> > > > >> > > > >> > > > > > Now >> > > > >> > > > >> > > > > > > I >> > > > >> > > > >> > > > > > > >> > > > realized >> > > > >> > > > >> > > > > > > >> > > > > > that >> > > > >> > > > >> > > > > > > >> > > > > > > > this is probably not a >> > problem. >> > > > >> > Currently >> > > > >> > > > >> client >> > > > >> > > > >> > > > will >> > > > >> > > > >> > > > > > > >> refresh >> > > > >> > > > >> > > > > > > >> > > > > metadata >> > > > >> > > > >> > > > > > > >> > > > > > > > either due to >> > > > >> InvalidMetadataException >> > > > >> > in >> > > > >> > > > the >> > > > >> > > > >> > > > response >> > > > >> > > > >> > > > > > > from >> > > > >> > > > >> > > > > > > >> > > broker >> > > > >> > > > >> > > > > > > >> > > > or >> > > > >> > > > >> > > > > > > >> > > > > > due >> > > > >> > > > >> > > > > > > >> > > > > > > > to metadata expiry. The >> > addition >> > > > of >> > > > >> the >> > > > >> > > > >> metadata >> > > > >> > > > >> > > > > version >> > > > >> > > > >> > > > > > > >> should >> > > > >> > > > >> > > > > > > >> > > > > > increase >> > > > >> > > > >> > > > > > > >> > > > > > > > the overhead of metadata >> > refresh >> > > > >> caused >> > > > >> > > by >> > > > >> > > > >> > > > > > > >> > > > InvalidMetadataException. >> > > > >> > > > >> > > > > > > >> > > > > If >> > > > >> > > > >> > > > > > > >> > > > > > > > client refresh metadata >> due to >> > > > >> expiry >> > > > >> > and >> > > > >> > > > it >> > > > >> > > > >> > > > receives >> > > > >> > > > >> > > > > a >> > > > >> > > > >> > > > > > > >> > metadata >> > > > >> > > > >> > > > > > > >> > > > > whose >> > > > >> > > > >> > > > > > > >> > > > > > > > version is lower than the >> > > current >> > > > >> > > metadata >> > > > >> > > > >> > > version, >> > > > >> > > > >> > > > we >> > > > >> > > > >> > > > > > can >> > > > >> > > > >> > > > > > > >> > reject >> > > > >> > > > >> > > > > > > >> > > > the >> > > > >> > > > >> > > > > > > >> > > > > > > > metadata but still reset >> the >> > > > >> metadata >> > > > >> > > age, >> > > > >> > > > >> which >> > > > >> > > > >> > > > > > > essentially >> > > > >> > > > >> > > > > > > >> > keep >> > > > >> > > > >> > > > > > > >> > > > the >> > > > >> > > > >> > > > > > > >> > > > > > > > existing behavior in the >> > client. >> > > > >> > > > >> > > > > > > >> > > > > > > > >> > > > >> > > > >> > > > > > > >> > > > > > > > Thanks much, >> > > > >> > > > >> > > > > > > >> > > > > > > > Dong >> > > > >> > > > >> > > > > > > >> > > > > > > > >> > > > >> > > > >> > > > > > > >> > > > > > > >> > > > >> > > > >> > > > > > > >> > > > > > >> > > > >> > > > >> > > > > > > >> > > > > >> > > > >> > > > >> > > > > > > >> > > > >> > > > >> > > > >> > > > > > > >> > > >> > > > >> > > > >> > > > > > > >> > >> > > > >> > > > >> > > > > > > >> >> > > > >> > > > >> > > > > > > > >> > > > >> > > > >> > > > > > > > >> > > > >> > > > >> > > > > > > >> > > > >> > > > >> > > > > > >> > > > >> > > > >> > > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > >> > > > >> > > > >> > >> > > > >> > > > >> >> > > > >> > > > > >> > > > >> > > > > >> > > > >> > > > >> > > > >> > > >> > > > >> > >> > > > >> >> > > > > >> > > > > >> > > > >> > > >> > >> > >