I think it is possible to move to entirely use partitionEpoch instead of (topic, partition) to identify a partition. Client can obtain the partitionEpoch -> (topic, partition) mapping from MetadataResponse. We probably need to figure out a way to assign partitionEpoch to existing partitions in the cluster. But this should be doable.
This is a good idea. I think it will save us some space in the request/response. The actual space saving in percentage probably depends on the amount of data and the number of partitions of the same topic. I just think we can do it in a separate KIP. On Mon, Jan 29, 2018 at 10:35 AM, Dong Lin <lindon...@gmail.com> wrote: > Hey Colin, > > I understand that the KIP will adds overhead by introducing per-partition > partitionEpoch. I am open to alternative solutions that does not incur > additional overhead. But I don't see a better way now. > > IMO the overhead in the FetchResponse may not be that much. We probably > should discuss the percentage increase rather than the absolute number > increase. Currently after KIP-227, per-partition header has 23 bytes. This > KIP adds another 4 bytes. Assume the records size is 10KB, the percentage > increase is 4 / (23 + 10000) = 0.03%. It seems negligible, right? > > I agree that we can probably save more space by using partition ID so that > we no longer needs the string topic name. The similar idea has also been > put in the Rejected Alternative section in KIP-227. While this idea is > promising, it seems orthogonal to the goal of this KIP. Given that there is > already many work to do in this KIP, maybe we can do the partition ID in a > separate KIP? > > Thanks, > Dong > > > > > > > > > > > On Mon, Jan 29, 2018 at 10:18 AM, Colin McCabe <cmcc...@apache.org> wrote: > >> On Fri, Jan 26, 2018, at 12:17, Dong Lin wrote: >> > Hey Colin, >> > >> > >> > On Fri, Jan 26, 2018 at 10:16 AM, Colin McCabe <cmcc...@apache.org> >> wrote: >> > >> > > On Thu, Jan 25, 2018, at 16:47, Dong Lin wrote: >> > > > Hey Colin, >> > > > >> > > > Thanks for the comment. >> > > > >> > > > On Thu, Jan 25, 2018 at 4:15 PM, Colin McCabe <cmcc...@apache.org> >> > > wrote: >> > > > >> > > > > On Wed, Jan 24, 2018, at 21:07, Dong Lin wrote: >> > > > > > Hey Colin, >> > > > > > >> > > > > > Thanks for reviewing the KIP. >> > > > > > >> > > > > > If I understand you right, you maybe suggesting that we can use >> a >> > > global >> > > > > > metadataEpoch that is incremented every time controller updates >> > > metadata. >> > > > > > The problem with this solution is that, if a topic is deleted >> and >> > > created >> > > > > > again, user will not know whether that the offset which is >> stored >> > > before >> > > > > > the topic deletion is no longer valid. This motivates the idea >> to >> > > include >> > > > > > per-partition partitionEpoch. Does this sound reasonable? >> > > > > >> > > > > Hi Dong, >> > > > > >> > > > > Perhaps we can store the last valid offset of each deleted topic >> in >> > > > > ZooKeeper. Then, when a topic with one of those names gets >> > > re-created, we >> > > > > can start the topic at the previous end offset rather than at 0. >> This >> > > > > preserves immutability. It is no more burdensome than having to >> > > preserve a >> > > > > "last epoch" for the deleted partition somewhere, right? >> > > > > >> > > > >> > > > My concern with this solution is that the number of zookeeper nodes >> get >> > > > more and more over time if some users keep deleting and creating >> topics. >> > > Do >> > > > you think this can be a problem? >> > > >> > > Hi Dong, >> > > >> > > We could expire the "partition tombstones" after an hour or so. In >> > > practice this would solve the issue for clients that like to destroy >> and >> > > re-create topics all the time. In any case, doesn't the current >> proposal >> > > add per-partition znodes as well that we have to track even after the >> > > partition is deleted? Or did I misunderstand that? >> > > >> > >> > Actually the current KIP does not add per-partition znodes. Could you >> > double check? I can fix the KIP wiki if there is anything misleading. >> >> Hi Dong, >> >> I double-checked the KIP, and I can see that you are in fact using a >> global counter for initializing partition epochs. So, you are correct, it >> doesn't add per-partition znodes for partitions that no longer exist. >> >> > >> > If we expire the "partition tomstones" after an hour, and the topic is >> > re-created after more than an hour since the topic deletion, then we are >> > back to the situation where user can not tell whether the topic has been >> > re-created or not, right? >> >> Yes, with an expiration period, it would not ensure immutability-- you >> could effectively reuse partition names and they would look the same. >> >> > >> > >> > > >> > > It's not really clear to me what should happen when a topic is >> destroyed >> > > and re-created with new data. Should consumers continue to be able to >> > > consume? We don't know where they stopped consuming from the previous >> > > incarnation of the topic, so messages may have been lost. Certainly >> > > consuming data from offset X of the new incarnation of the topic may >> give >> > > something totally different from what you would have gotten from >> offset X >> > > of the previous incarnation of the topic. >> > > >> > >> > With the current KIP, if a consumer consumes a topic based on the last >> > remembered (offset, partitionEpoch, leaderEpoch), and if the topic is >> > re-created, consume will throw InvalidPartitionEpochException because >> the >> > previous partitionEpoch will be different from the current >> partitionEpoch. >> > This is described in the Proposed Changes -> Consumption after topic >> > deletion in the KIP. I can improve the KIP if there is anything not >> clear. >> >> Thanks for the clarification. It sounds like what you really want is >> immutability-- i.e., to never "really" reuse partition identifiers. And >> you do this by making the partition name no longer the "real" identifier. >> >> My big concern about this KIP is that it seems like an anti-scalability >> feature. Now we are adding 4 extra bytes for every partition in the >> FetchResponse and Request, for example. That could be 40 kb per request, >> if the user has 10,000 partitions. And of course, the KIP also makes >> massive changes to UpdateMetadataRequest, MetadataResponse, >> OffsetCommitRequest, OffsetFetchResponse, LeaderAndIsrRequest, >> ListOffsetResponse, etc. which will also increase their size on the wire >> and in memory. >> >> One thing that we talked a lot about in the past is replacing partition >> names with IDs. IDs have a lot of really nice features. They take up much >> less space in memory than strings (especially 2-byte Java strings). They >> can often be allocated on the stack rather than the heap (important when >> you are dealing with hundreds of thousands of them). They can be >> efficiently deserialized and serialized. If we use 64-bit ones, we will >> never run out of IDs, which means that they can always be unique per >> partition. >> >> Given that the partition name is no longer the "real" identifier for >> partitions in the current KIP-232 proposal, why not just move to using >> partition IDs entirely instead of strings? You have to change all the >> messages anyway. There isn't much point any more to carrying around the >> partition name in every RPC, since you really need (name, epoch) to >> identify the partition. >> Probably the metadata response and a few other messages would have to >> still carry the partition name, to allow clients to go from name to id. >> But we could mostly forget about the strings. And then this would be a >> scalability improvement rather than a scalability problem. >> >> > >> > >> > > By choosing to reuse the same (topic, partition, offset) 3-tuple, we >> have >> > >> > chosen to give up immutability. That was a really bad decision. And >> now >> > > we have to worry about time dependencies, stale cached data, and all >> the >> > > rest. We can't completely fix this inside Kafka no matter what we do, >> > > because not all that cached data is inside Kafka itself. Some of it >> may be >> > > in systems that Kafka has sent data to, such as other daemons, SQL >> > > databases, streams, and so forth. >> > > >> > >> > The current KIP will uniquely identify a message using (topic, >> partition, >> > offset, partitionEpoch) 4-tuple. This addresses the message immutability >> > issue that you mentioned. Is there any corner case where the message >> > immutability is still not preserved with the current KIP? >> > >> > >> > > >> > > I guess the idea here is that mirror maker should work as expected >> when >> > > users destroy a topic and re-create it with the same name. That's >> kind of >> > > tough, though, since in that scenario, mirror maker probably should >> destroy >> > > and re-create the topic on the other end, too, right? Otherwise, >> what you >> > > end up with on the other end could be half of one incarnation of the >> topic, >> > > and half of another. >> > > >> > > What mirror maker really needs is to be able to follow a stream of >> events >> > > about the kafka cluster itself. We could have some master topic >> which is >> > > always present and which contains data about all topic deletions, >> > > creations, etc. Then MM can simply follow this topic and do what is >> needed. >> > > >> > > > >> > > > >> > > > > >> > > > > > >> > > > > > Then the next question maybe, should we use a global >> metadataEpoch + >> > > > > > per-partition partitionEpoch, instead of using per-partition >> > > leaderEpoch >> > > > > + >> > > > > > per-partition leaderEpoch. The former solution using >> metadataEpoch >> > > would >> > > > > > not work due to the following scenario (provided by Jun): >> > > > > > >> > > > > > "Consider the following scenario. In metadata v1, the leader >> for a >> > > > > > partition is at broker 1. In metadata v2, leader is at broker >> 2. In >> > > > > > metadata v3, leader is at broker 1 again. The last committed >> offset >> > > in >> > > > > v1, >> > > > > > v2 and v3 are 10, 20 and 30, respectively. A consumer is >> started and >> > > > > reads >> > > > > > metadata v1 and reads messages from offset 0 to 25 from broker >> 1. My >> > > > > > understanding is that in the current proposal, the metadata >> version >> > > > > > associated with offset 25 is v1. The consumer is then restarted >> and >> > > > > fetches >> > > > > > metadata v2. The consumer tries to read from broker 2, which is >> the >> > > old >> > > > > > leader with the last offset at 20. In this case, the consumer >> will >> > > still >> > > > > > get OffsetOutOfRangeException incorrectly." >> > > > > > >> > > > > > Regarding your comment "For the second purpose, this is "soft >> state" >> > > > > > anyway. If the client thinks X is the leader but Y is really >> the >> > > leader, >> > > > > > the client will talk to X, and X will point out its mistake by >> > > sending >> > > > > back >> > > > > > a NOT_LEADER_FOR_PARTITION.", it is probably no true. The >> problem >> > > here is >> > > > > > that the old leader X may still think it is the leader of the >> > > partition >> > > > > and >> > > > > > thus it will not send back NOT_LEADER_FOR_PARTITION. The reason >> is >> > > > > provided >> > > > > > in KAFKA-6262. Can you check if that makes sense? >> > > > > >> > > > > This is solvable with a timeout, right? If the leader can't >> > > communicate >> > > > > with the controller for a certain period of time, it should stop >> > > acting as >> > > > > the leader. We have to solve this problem, anyway, in order to >> fix >> > > all the >> > > > > corner cases. >> > > > > >> > > > >> > > > Not sure if I fully understand your proposal. The proposal seems to >> > > require >> > > > non-trivial changes to our existing leadership election mechanism. >> Could >> > > > you provide more detail regarding how it works? For example, how >> should >> > > > user choose this timeout, how leader determines whether it can still >> > > > communicate with controller, and how this triggers controller to >> elect >> > > new >> > > > leader? >> > > >> > > Before I come up with any proposal, let me make sure I understand the >> > > problem correctly. My big question was, what prevents split-brain >> here? >> > > >> > > Let's say I have a partition which is on nodes A, B, and C, with >> min-ISR >> > > 2. The controller is D. At some point, there is a network partition >> > > between A and B and the rest of the cluster. The Controller >> re-assigns the >> > > partition to nodes C, D, and E. But A and B keep chugging away, even >> > > though they can no longer communicate with the controller. >> > > >> > > At some point, a client with stale metadata writes to the partition. >> It >> > > still thinks the partition is on node A, B, and C, so that's where it >> sends >> > > the data. It's unable to talk to C, but A and B reply back that all >> is >> > > well. >> > > >> > > Is this not a case where we could lose data due to split brain? Or is >> > > there a mechanism for preventing this that I missed? If it is, it >> seems >> > > like a pretty serious failure case that we should be handling with our >> > > metadata rework. And I think epoch numbers and timeouts might be >> part of >> > > the solution. >> > > >> > >> > Right, split brain can happen if RF=4 and minIsr=2. However, I am not >> sure >> > it is a pretty serious issue which we need to address today. This can be >> > prevented by configuring the Kafka topic so that minIsr > RF/2. >> Actually, >> > if user sets minIsr=2, is there anything reason that user wants to set >> RF=4 >> > instead of 4? >> > >> > Introducing timeout in leader election mechanism is non-trivial. I >> think we >> > probably want to do that only if there is good use-case that can not >> > otherwise be addressed with the current mechanism. >> >> I still would like to think about these corner cases more. But perhaps >> it's not directly related to this KIP. >> >> regards, >> Colin >> >> >> > >> > >> > > best, >> > > Colin >> > > >> > > >> > > > >> > > > >> > > > > best, >> > > > > Colin >> > > > > >> > > > > > >> > > > > > Regards, >> > > > > > Dong >> > > > > > >> > > > > > >> > > > > > On Wed, Jan 24, 2018 at 10:39 AM, Colin McCabe < >> cmcc...@apache.org> >> > > > > wrote: >> > > > > > >> > > > > > > Hi Dong, >> > > > > > > >> > > > > > > Thanks for proposing this KIP. I think a metadata epoch is a >> > > really >> > > > > good >> > > > > > > idea. >> > > > > > > >> > > > > > > I read through the DISCUSS thread, but I still don't have a >> clear >> > > > > picture >> > > > > > > of why the proposal uses a metadata epoch per partition rather >> > > than a >> > > > > > > global metadata epoch. A metadata epoch per partition is >> kind of >> > > > > > > unpleasant-- it's at least 4 extra bytes per partition that we >> > > have to >> > > > > send >> > > > > > > over the wire in every full metadata request, which could >> become >> > > extra >> > > > > > > kilobytes on the wire when the number of partitions becomes >> large. >> > > > > Plus, >> > > > > > > we have to update all the auxillary classes to include an >> epoch. >> > > > > > > >> > > > > > > We need to have a global metadata epoch anyway to handle >> partition >> > > > > > > addition and deletion. For example, if I give you >> > > > > > > MetadataResponse{part1,epoch 1, part2, epoch 1} and {part1, >> > > epoch1}, >> > > > > which >> > > > > > > MetadataResponse is newer? You have no way of knowing. It >> could >> > > be >> > > > > that >> > > > > > > part2 has just been created, and the response with 2 >> partitions is >> > > > > newer. >> > > > > > > Or it coudl be that part2 has just been deleted, and >> therefore the >> > > > > response >> > > > > > > with 1 partition is newer. You must have a global epoch to >> > > > > disambiguate >> > > > > > > these two cases. >> > > > > > > >> > > > > > > Previously, I worked on the Ceph distributed filesystem. >> Ceph had >> > > the >> > > > > > > concept of a map of the whole cluster, maintained by a few >> servers >> > > > > doing >> > > > > > > paxos. This map was versioned by a single 64-bit epoch number >> > > which >> > > > > > > increased on every change. It was propagated to clients >> through >> > > > > gossip. I >> > > > > > > wonder if something similar could work here? >> > > > > > > >> > > > > > > It seems like the the Kafka MetadataResponse serves two >> somewhat >> > > > > unrelated >> > > > > > > purposes. Firstly, it lets clients know what partitions >> exist in >> > > the >> > > > > > > system and where they live. Secondly, it lets clients know >> which >> > > nodes >> > > > > > > within the partition are in-sync (in the ISR) and which node >> is the >> > > > > leader. >> > > > > > > >> > > > > > > The first purpose is what you really need a metadata epoch >> for, I >> > > > > think. >> > > > > > > You want to know whether a partition exists or not, or you >> want to >> > > know >> > > > > > > which nodes you should talk to in order to write to a given >> > > > > partition. A >> > > > > > > single metadata epoch for the whole response should be >> adequate >> > > here. >> > > > > We >> > > > > > > should not change the partition assignment without going >> through >> > > > > zookeeper >> > > > > > > (or a similar system), and this inherently serializes updates >> into >> > > a >> > > > > > > numbered stream. Brokers should also stop responding to >> requests >> > > when >> > > > > they >> > > > > > > are unable to contact ZK for a certain time period. This >> prevents >> > > the >> > > > > case >> > > > > > > where a given partition has been moved off some set of nodes, >> but a >> > > > > client >> > > > > > > still ends up talking to those nodes and writing data there. >> > > > > > > >> > > > > > > For the second purpose, this is "soft state" anyway. If the >> client >> > > > > thinks >> > > > > > > X is the leader but Y is really the leader, the client will >> talk >> > > to X, >> > > > > and >> > > > > > > X will point out its mistake by sending back a >> > > > > NOT_LEADER_FOR_PARTITION. >> > > > > > > Then the client can update its metadata again and find the new >> > > leader, >> > > > > if >> > > > > > > there is one. There is no need for an epoch to handle this. >> > > > > Similarly, I >> > > > > > > can't think of a reason why changing the in-sync replica set >> needs >> > > to >> > > > > bump >> > > > > > > the epoch. >> > > > > > > >> > > > > > > best, >> > > > > > > Colin >> > > > > > > >> > > > > > > >> > > > > > > On Wed, Jan 24, 2018, at 09:45, Dong Lin wrote: >> > > > > > > > Thanks much for reviewing the KIP! >> > > > > > > > >> > > > > > > > Dong >> > > > > > > > >> > > > > > > > On Wed, Jan 24, 2018 at 7:10 AM, Guozhang Wang < >> > > wangg...@gmail.com> >> > > > > > > wrote: >> > > > > > > > >> > > > > > > > > Yeah that makes sense, again I'm just making sure we >> understand >> > > > > all the >> > > > > > > > > scenarios and what to expect. >> > > > > > > > > >> > > > > > > > > I agree that if, more generally speaking, say users have >> only >> > > > > consumed >> > > > > > > to >> > > > > > > > > offset 8, and then call seek(16) to "jump" to a further >> > > position, >> > > > > then >> > > > > > > she >> > > > > > > > > needs to be aware that OORE maybe thrown and she needs to >> > > handle >> > > > > it or >> > > > > > > rely >> > > > > > > > > on reset policy which should not surprise her. >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > I'm +1 on the KIP. >> > > > > > > > > >> > > > > > > > > Guozhang >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > On Wed, Jan 24, 2018 at 12:31 AM, Dong Lin < >> > > lindon...@gmail.com> >> > > > > > > wrote: >> > > > > > > > > >> > > > > > > > > > Yes, in general we can not prevent >> OffsetOutOfRangeException >> > > if >> > > > > user >> > > > > > > > > seeks >> > > > > > > > > > to a wrong offset. The main goal is to prevent >> > > > > > > OffsetOutOfRangeException >> > > > > > > > > if >> > > > > > > > > > user has done things in the right way, e.g. user should >> know >> > > that >> > > > > > > there >> > > > > > > > > is >> > > > > > > > > > message with this offset. >> > > > > > > > > > >> > > > > > > > > > For example, if user calls seek(..) right after >> > > construction, the >> > > > > > > only >> > > > > > > > > > reason I can think of is that user stores offset >> externally. >> > > In >> > > > > this >> > > > > > > > > case, >> > > > > > > > > > user currently needs to use the offset which is obtained >> > > using >> > > > > > > > > position(..) >> > > > > > > > > > from the last run. With this KIP, user needs to get the >> > > offset >> > > > > and >> > > > > > > the >> > > > > > > > > > offsetEpoch using positionAndOffsetEpoch(...) and stores >> > > these >> > > > > > > > > information >> > > > > > > > > > externally. The next time user starts consumer, he/she >> needs >> > > to >> > > > > call >> > > > > > > > > > seek(..., offset, offsetEpoch) right after construction. >> > > Then KIP >> > > > > > > should >> > > > > > > > > be >> > > > > > > > > > able to ensure that we don't throw >> OffsetOutOfRangeException >> > > if >> > > > > > > there is >> > > > > > > > > no >> > > > > > > > > > unclean leader election. >> > > > > > > > > > >> > > > > > > > > > Does this sound OK? >> > > > > > > > > > >> > > > > > > > > > Regards, >> > > > > > > > > > Dong >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > On Tue, Jan 23, 2018 at 11:44 PM, Guozhang Wang < >> > > > > wangg...@gmail.com> >> > > > > > > > > > wrote: >> > > > > > > > > > >> > > > > > > > > > > "If consumer wants to consume message with offset 16, >> then >> > > > > consumer >> > > > > > > > > must >> > > > > > > > > > > have >> > > > > > > > > > > already fetched message with offset 15" >> > > > > > > > > > > >> > > > > > > > > > > --> this may not be always true right? What if >> consumer >> > > just >> > > > > call >> > > > > > > > > > seek(16) >> > > > > > > > > > > after construction and then poll without committed >> offset >> > > ever >> > > > > > > stored >> > > > > > > > > > > before? Admittedly it is rare but we do not >> programmably >> > > > > disallow >> > > > > > > it. >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > Guozhang >> > > > > > > > > > > >> > > > > > > > > > > On Tue, Jan 23, 2018 at 10:42 PM, Dong Lin < >> > > > > lindon...@gmail.com> >> > > > > > > > > wrote: >> > > > > > > > > > > >> > > > > > > > > > > > Hey Guozhang, >> > > > > > > > > > > > >> > > > > > > > > > > > Thanks much for reviewing the KIP! >> > > > > > > > > > > > >> > > > > > > > > > > > In the scenario you described, let's assume that >> broker >> > > A has >> > > > > > > > > messages >> > > > > > > > > > > with >> > > > > > > > > > > > offset up to 10, and broker B has messages with >> offset >> > > up to >> > > > > 20. >> > > > > > > If >> > > > > > > > > > > > consumer wants to consume message with offset 9, it >> will >> > > not >> > > > > > > receive >> > > > > > > > > > > > OffsetOutOfRangeException >> > > > > > > > > > > > from broker A. >> > > > > > > > > > > > >> > > > > > > > > > > > If consumer wants to consume message with offset >> 16, then >> > > > > > > consumer >> > > > > > > > > must >> > > > > > > > > > > > have already fetched message with offset 15, which >> can >> > > only >> > > > > come >> > > > > > > from >> > > > > > > > > > > > broker B. Because consumer will fetch from broker B >> only >> > > if >> > > > > > > > > leaderEpoch >> > > > > > > > > > > >= >> > > > > > > > > > > > 2, then the current consumer leaderEpoch can not be >> 1 >> > > since >> > > > > this >> > > > > > > KIP >> > > > > > > > > > > > prevents leaderEpoch rewind. Thus we will not have >> > > > > > > > > > > > OffsetOutOfRangeException >> > > > > > > > > > > > in this case. >> > > > > > > > > > > > >> > > > > > > > > > > > Does this address your question, or maybe there is >> more >> > > > > advanced >> > > > > > > > > > scenario >> > > > > > > > > > > > that the KIP does not handle? >> > > > > > > > > > > > >> > > > > > > > > > > > Thanks, >> > > > > > > > > > > > Dong >> > > > > > > > > > > > >> > > > > > > > > > > > On Tue, Jan 23, 2018 at 9:43 PM, Guozhang Wang < >> > > > > > > wangg...@gmail.com> >> > > > > > > > > > > wrote: >> > > > > > > > > > > > >> > > > > > > > > > > > > Thanks Dong, I made a pass over the wiki and it >> lgtm. >> > > > > > > > > > > > > >> > > > > > > > > > > > > Just a quick question: can we completely >> eliminate the >> > > > > > > > > > > > > OffsetOutOfRangeException with this approach? Say >> if >> > > there >> > > > > is >> > > > > > > > > > > consecutive >> > > > > > > > > > > > > leader changes such that the cached metadata's >> > > partition >> > > > > epoch >> > > > > > > is >> > > > > > > > > 1, >> > > > > > > > > > > and >> > > > > > > > > > > > > the metadata fetch response returns with >> partition >> > > epoch 2 >> > > > > > > > > pointing >> > > > > > > > > > to >> > > > > > > > > > > > > leader broker A, while the actual up-to-date >> metadata >> > > has >> > > > > > > partition >> > > > > > > > > > > > epoch 3 >> > > > > > > > > > > > > whose leader is now broker B, the metadata >> refresh will >> > > > > still >> > > > > > > > > succeed >> > > > > > > > > > > and >> > > > > > > > > > > > > the follow-up fetch request may still see OORE? >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > Guozhang >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > On Tue, Jan 23, 2018 at 3:47 PM, Dong Lin < >> > > > > lindon...@gmail.com >> > > > > > > > >> > > > > > > > > > wrote: >> > > > > > > > > > > > > >> > > > > > > > > > > > > > Hi all, >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > I would like to start the voting process for >> KIP-232: >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > https://cwiki.apache.org/ >> > > confluence/display/KAFKA/KIP- >> > > > > > > > > > > > > > 232%3A+Detect+outdated+metadat >> a+using+leaderEpoch+ >> > > > > > > > > > and+partitionEpoch >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > The KIP will help fix a concurrency issue in >> Kafka >> > > which >> > > > > > > > > currently >> > > > > > > > > > > can >> > > > > > > > > > > > > > cause message loss or message duplication in >> > > consumer. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > Regards, >> > > > > > > > > > > > > > Dong >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > -- >> > > > > > > > > > > > > -- Guozhang >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > -- >> > > > > > > > > > > -- Guozhang >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > -- >> > > > > > > > > -- Guozhang >> > > > > > > > > >> > > > > > > >> > > > > >> > > >> > >