Hey Jun, It seems that we have made considerable progress on the discussion of KIP-253 since February. Do you think we should continue the discussion there, or can we continue the voting for this KIP? I am happy to submit the PR and move forward the progress for this KIP.
Thanks! Dong On Wed, Feb 7, 2018 at 11:42 PM, Dong Lin <lindon...@gmail.com> wrote: > Hey Jun, > > Sure, I will come up with a KIP this week. I think there is a way to allow > partition expansion to arbitrary number without introducing new concepts > such as read-only partition or repartition epoch. > > Thanks, > Dong > > On Wed, Feb 7, 2018 at 5:28 PM, Jun Rao <j...@confluent.io> wrote: > >> Hi, Dong, >> >> Thanks for the reply. The general idea that you had for adding partitions >> is similar to what we had in mind. It would be useful to make this more >> general, allowing adding an arbitrary number of partitions (instead of >> just >> doubling) and potentially removing partitions as well. The following is >> the >> high level idea from the discussion with Colin, Jason and Ismael. >> >> * To change the number of partitions from X to Y in a topic, the >> controller >> marks all existing X partitions as read-only and creates Y new partitions. >> The new partitions are writable and are tagged with a higher repartition >> epoch (RE). >> >> * The controller propagates the new metadata to every broker. Once the >> leader of a partition is marked as read-only, it rejects the produce >> requests on this partition. The producer will then refresh the metadata >> and >> start publishing to the new writable partitions. >> >> * The consumers will then be consuming messages in RE order. The consumer >> coordinator will only assign partitions in the same RE to consumers. Only >> after all messages in an RE are consumed, will partitions in a higher RE >> be >> assigned to consumers. >> >> As Colin mentioned, if we do the above, we could potentially (1) use a >> globally unique partition id, or (2) use a globally unique topic id to >> distinguish recreated partitions due to topic deletion. >> >> So, perhaps we can sketch out the re-partitioning KIP a bit more and see >> if >> there is any overlap with KIP-232. Would you be interested in doing that? >> If not, we can do that next week. >> >> Jun >> >> >> On Tue, Feb 6, 2018 at 11:30 AM, Dong Lin <lindon...@gmail.com> wrote: >> >> > Hey Jun, >> > >> > Interestingly I am also planning to sketch a KIP to allow partition >> > expansion for keyed topics after this KIP. Since you are already doing >> > that, I guess I will just share my high level idea here in case it is >> > helpful. >> > >> > The motivation for the KIP is that we currently lose order guarantee for >> > messages with the same key if we expand partitions of keyed topic. >> > >> > The solution can probably be built upon the following ideas: >> > >> > - Partition number of the keyed topic should always be doubled (or >> > multiplied by power of 2). Given that we select a partition based on >> > hash(key) % partitionNum, this should help us ensure that, a message >> > assigned to an existing partition will not be mapped to another existing >> > partition after partition expansion. >> > >> > - Producer includes in the ProduceRequest some information that helps >> > ensure that messages produced ti a partition will monotonically >> increase in >> > the partitionNum of the topic. In other words, if broker receives a >> > ProduceRequest and notices that the producer does not know the partition >> > number has increased, broker should reject this request. That >> "information" >> > maybe leaderEpoch, max partitionEpoch of the partitions of the topic, or >> > simply partitionNum of the topic. The benefit of this property is that >> we >> > can keep the new logic for in-order message consumption entirely in how >> > consumer leader determines the partition -> consumer mapping. >> > >> > - When consumer leader determines partition -> consumer mapping, leader >> > first reads the start position for each partition using >> OffsetFetchRequest. >> > If start position are all non-zero, then assignment can be done in its >> > current manner. The assumption is that, a message in the new partition >> > should only be consumed after all messages with the same key produced >> > before it has been consumed. Since some messages in the new partition >> has >> > been consumed, we should not worry about consuming messages >> out-of-order. >> > This benefit of this approach is that we can avoid unnecessary overhead >> in >> > the common case. >> > >> > - If the consumer leader finds that the start position for some >> partition >> > is 0. Say the current partition number is 18 and the partition index is >> 12, >> > then consumer leader should ensure that messages produced to partition >> 12 - >> > 18/2 = 3 before the first message of partition 12 is consumed, before it >> > assigned partition 12 to any consumer in the consumer group. Since we >> have >> > a "information" that is monotonically increasing per partition, consumer >> > can read the value of this information from the first message in >> partition >> > 12, get the offset corresponding to this value in partition 3, assign >> > partition except for partition 12 (and probably other new partitions) to >> > the existing consumers, waiting for the committed offset to go beyond >> this >> > offset for partition 3, and trigger rebalance again so that partition 3 >> can >> > be reassigned to some consumer. >> > >> > >> > Thanks, >> > Dong >> > >> > >> > On Tue, Feb 6, 2018 at 10:10 AM, Jun Rao <j...@confluent.io> wrote: >> > >> > > Hi, Dong, >> > > >> > > Thanks for the KIP. It looks good overall. We are working on a >> separate >> > KIP >> > > for adding partitions while preserving the ordering guarantees. That >> may >> > > require another flavor of partition epoch. It's not very clear whether >> > that >> > > partition epoch can be merged with the partition epoch in this KIP. >> So, >> > > perhaps you can wait on this a bit until we post the other KIP in the >> > next >> > > few days. >> > > >> > > Jun >> > > >> > > >> > > >> > > On Mon, Feb 5, 2018 at 2:43 PM, Becket Qin <becket....@gmail.com> >> wrote: >> > > >> > > > +1 on the KIP. >> > > > >> > > > I think the KIP is mainly about adding the capability of tracking >> the >> > > > system state change lineage. It does not seem necessary to bundle >> this >> > > KIP >> > > > with replacing the topic partition with partition epoch in >> > produce/fetch. >> > > > Replacing topic-partition string with partition epoch is >> essentially a >> > > > performance improvement on top of this KIP. That can probably be >> done >> > > > separately. >> > > > >> > > > Thanks, >> > > > >> > > > Jiangjie (Becket) Qin >> > > > >> > > > On Mon, Jan 29, 2018 at 11:52 AM, Dong Lin <lindon...@gmail.com> >> > wrote: >> > > > >> > > > > Hey Colin, >> > > > > >> > > > > On Mon, Jan 29, 2018 at 11:23 AM, Colin McCabe < >> cmcc...@apache.org> >> > > > wrote: >> > > > > >> > > > > > > On Mon, Jan 29, 2018 at 10:35 AM, Dong Lin < >> lindon...@gmail.com> >> > > > > wrote: >> > > > > > > >> > > > > > > > Hey Colin, >> > > > > > > > >> > > > > > > > I understand that the KIP will adds overhead by introducing >> > > > > > per-partition >> > > > > > > > partitionEpoch. I am open to alternative solutions that does >> > not >> > > > > incur >> > > > > > > > additional overhead. But I don't see a better way now. >> > > > > > > > >> > > > > > > > IMO the overhead in the FetchResponse may not be that much. >> We >> > > > > probably >> > > > > > > > should discuss the percentage increase rather than the >> absolute >> > > > > number >> > > > > > > > increase. Currently after KIP-227, per-partition header has >> 23 >> > > > bytes. >> > > > > > This >> > > > > > > > KIP adds another 4 bytes. Assume the records size is 10KB, >> the >> > > > > > percentage >> > > > > > > > increase is 4 / (23 + 10000) = 0.03%. It seems negligible, >> > right? >> > > > > > >> > > > > > Hi Dong, >> > > > > > >> > > > > > Thanks for the response. I agree that the FetchRequest / >> > > FetchResponse >> > > > > > overhead should be OK, now that we have incremental fetch >> requests >> > > and >> > > > > > responses. However, there are a lot of cases where the >> percentage >> > > > > increase >> > > > > > is much greater. For example, if a client is doing full >> > > > > MetadataRequests / >> > > > > > Responses, we have some math kind of like this per partition: >> > > > > > >> > > > > > > UpdateMetadataRequestPartitionState => topic partition >> > > > > controller_epoch >> > > > > > leader leader_epoch partition_epoch isr zk_version replicas >> > > > > > offline_replicas >> > > > > > > 14 bytes: topic => string (assuming about 10 byte topic >> names) >> > > > > > > 4 bytes: partition => int32 >> > > > > > > 4 bytes: conroller_epoch => int32 >> > > > > > > 4 bytes: leader => int32 >> > > > > > > 4 bytes: leader_epoch => int32 >> > > > > > > +4 EXTRA bytes: partition_epoch => int32 <-- NEW >> > > > > > > 2+4+4+4 bytes: isr => [int32] (assuming 3 in the ISR) >> > > > > > > 4 bytes: zk_version => int32 >> > > > > > > 2+4+4+4 bytes: replicas => [int32] (assuming 3 replicas) >> > > > > > > 2 offline_replicas => [int32] (assuming no offline replicas) >> > > > > > >> > > > > > Assuming I added that up correctly, the per-partition overhead >> goes >> > > > from >> > > > > > 64 bytes per partition to 68, a 6.2% increase. >> > > > > > >> > > > > > We could do similar math for a lot of the other RPCs. And you >> will >> > > > have >> > > > > a >> > > > > > similar memory and garbage collection impact on the brokers >> since >> > you >> > > > > have >> > > > > > to store all this extra state as well. >> > > > > > >> > > > > >> > > > > That is correct. IMO the Metadata is only updated periodically >> and is >> > > > > probably not a big deal if we increase it by 6%. The FetchResponse >> > and >> > > > > ProduceRequest are probably the only requests that are bounded by >> the >> > > > > bandwidth throughput. >> > > > > >> > > > > >> > > > > > >> > > > > > > > >> > > > > > > > I agree that we can probably save more space by using >> partition >> > > ID >> > > > so >> > > > > > that >> > > > > > > > we no longer needs the string topic name. The similar idea >> has >> > > also >> > > > > > been >> > > > > > > > put in the Rejected Alternative section in KIP-227. While >> this >> > > idea >> > > > > is >> > > > > > > > promising, it seems orthogonal to the goal of this KIP. >> Given >> > > that >> > > > > > there is >> > > > > > > > already many work to do in this KIP, maybe we can do the >> > > partition >> > > > ID >> > > > > > in a >> > > > > > > > separate KIP? >> > > > > > >> > > > > > I guess my thinking is that the goal here is to replace an >> > identifier >> > > > > > which can be re-used (the tuple of topic name, partition ID) >> with >> > an >> > > > > > identifier that cannot be re-used (the tuple of topic name, >> > partition >> > > > ID, >> > > > > > partition epoch) in order to gain better semantics. As long as >> we >> > > are >> > > > > > replacing the identifier, why not replace it with an identifier >> > that >> > > > has >> > > > > > important performance advantages? The KIP freeze for the next >> > > release >> > > > > has >> > > > > > already passed, so there is time to do this. >> > > > > > >> > > > > >> > > > > In general it can be easier for discussion and implementation if >> we >> > can >> > > > > split a larger task into smaller and independent tasks. For >> example, >> > > > > KIP-112 and KIP-113 both deals with the JBOD support. KIP-31, >> KIP-32 >> > > and >> > > > > KIP-33 are about timestamp support. The option on this can be >> subject >> > > > > though. >> > > > > >> > > > > IMO the change to switch from (topic, partition ID) to >> partitionEpch >> > in >> > > > all >> > > > > request/response requires us to going through all request one by >> one. >> > > It >> > > > > may not be hard but it can be time consuming and tedious. At high >> > level >> > > > the >> > > > > goal and the change for that will be orthogonal to the changes >> > required >> > > > in >> > > > > this KIP. That is the main reason I think we can split them into >> two >> > > > KIPs. >> > > > > >> > > > > >> > > > > > On Mon, Jan 29, 2018, at 10:54, Dong Lin wrote: >> > > > > > > I think it is possible to move to entirely use partitionEpoch >> > > instead >> > > > > of >> > > > > > > (topic, partition) to identify a partition. Client can obtain >> the >> > > > > > > partitionEpoch -> (topic, partition) mapping from >> > MetadataResponse. >> > > > We >> > > > > > > probably need to figure out a way to assign partitionEpoch to >> > > > existing >> > > > > > > partitions in the cluster. But this should be doable. >> > > > > > > >> > > > > > > This is a good idea. I think it will save us some space in the >> > > > > > > request/response. The actual space saving in percentage >> probably >> > > > > depends >> > > > > > on >> > > > > > > the amount of data and the number of partitions of the same >> > topic. >> > > I >> > > > > just >> > > > > > > think we can do it in a separate KIP. >> > > > > > >> > > > > > Hmm. How much extra work would be required? It seems like we >> are >> > > > > already >> > > > > > changing almost every RPC that involves topics and partitions, >> > > already >> > > > > > adding new per-partition state to ZooKeeper, already changing >> how >> > > > clients >> > > > > > interact with partitions. Is there some other big piece of work >> > we'd >> > > > > have >> > > > > > to do to move to partition IDs that we wouldn't need for >> partition >> > > > > epochs? >> > > > > > I guess we'd have to find a way to support regular >> expression-based >> > > > topic >> > > > > > subscriptions. If we split this into multiple KIPs, wouldn't we >> > end >> > > up >> > > > > > changing all that RPCs and ZK state a second time? Also, I'm >> > curious >> > > > if >> > > > > > anyone has done any proof of concept GC, memory, and network >> usage >> > > > > > measurements on switching topic names for topic IDs. >> > > > > > >> > > > > >> > > > > >> > > > > We will need to go over all requests/responses to check how to >> > replace >> > > > > (topic, partition ID) with partition epoch. It requires >> non-trivial >> > > work >> > > > > and could take time. As you mentioned, we may want to see how much >> > > saving >> > > > > we can get by switching from topic names to partition epoch. That >> > > itself >> > > > > requires time and experiment. It seems that the new idea does not >> > > > rollback >> > > > > any change proposed in this KIP. So I am not sure we can get much >> by >> > > > > putting them into the same KIP. >> > > > > >> > > > > Anyway, if more people are interested in seeing the new idea in >> the >> > > same >> > > > > KIP, I can try that. >> > > > > >> > > > > >> > > > > >> > > > > > >> > > > > > best, >> > > > > > Colin >> > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > On Mon, Jan 29, 2018 at 10:18 AM, Colin McCabe < >> > > cmcc...@apache.org >> > > > > >> > > > > > wrote: >> > > > > > > > >> > > > > > > >> On Fri, Jan 26, 2018, at 12:17, Dong Lin wrote: >> > > > > > > >> > Hey Colin, >> > > > > > > >> > >> > > > > > > >> > >> > > > > > > >> > On Fri, Jan 26, 2018 at 10:16 AM, Colin McCabe < >> > > > > cmcc...@apache.org> >> > > > > > > >> wrote: >> > > > > > > >> > >> > > > > > > >> > > On Thu, Jan 25, 2018, at 16:47, Dong Lin wrote: >> > > > > > > >> > > > Hey Colin, >> > > > > > > >> > > > >> > > > > > > >> > > > Thanks for the comment. >> > > > > > > >> > > > >> > > > > > > >> > > > On Thu, Jan 25, 2018 at 4:15 PM, Colin McCabe < >> > > > > > cmcc...@apache.org> >> > > > > > > >> > > wrote: >> > > > > > > >> > > > >> > > > > > > >> > > > > On Wed, Jan 24, 2018, at 21:07, Dong Lin wrote: >> > > > > > > >> > > > > > Hey Colin, >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > Thanks for reviewing the KIP. >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > If I understand you right, you maybe suggesting >> that >> > > we >> > > > > can >> > > > > > use >> > > > > > > >> a >> > > > > > > >> > > global >> > > > > > > >> > > > > > metadataEpoch that is incremented every time >> > > controller >> > > > > > updates >> > > > > > > >> > > metadata. >> > > > > > > >> > > > > > The problem with this solution is that, if a >> topic >> > is >> > > > > > deleted >> > > > > > > >> and >> > > > > > > >> > > created >> > > > > > > >> > > > > > again, user will not know whether that the offset >> > > which >> > > > is >> > > > > > > >> stored >> > > > > > > >> > > before >> > > > > > > >> > > > > > the topic deletion is no longer valid. This >> > motivates >> > > > the >> > > > > > idea >> > > > > > > >> to >> > > > > > > >> > > include >> > > > > > > >> > > > > > per-partition partitionEpoch. Does this sound >> > > > reasonable? >> > > > > > > >> > > > > >> > > > > > > >> > > > > Hi Dong, >> > > > > > > >> > > > > >> > > > > > > >> > > > > Perhaps we can store the last valid offset of each >> > > deleted >> > > > > > topic >> > > > > > > >> in >> > > > > > > >> > > > > ZooKeeper. Then, when a topic with one of those >> names >> > > > gets >> > > > > > > >> > > re-created, we >> > > > > > > >> > > > > can start the topic at the previous end offset >> rather >> > > than >> > > > > at >> > > > > > 0. >> > > > > > > >> This >> > > > > > > >> > > > > preserves immutability. It is no more burdensome >> than >> > > > > having >> > > > > > to >> > > > > > > >> > > preserve a >> > > > > > > >> > > > > "last epoch" for the deleted partition somewhere, >> > right? >> > > > > > > >> > > > > >> > > > > > > >> > > > >> > > > > > > >> > > > My concern with this solution is that the number of >> > > > zookeeper >> > > > > > nodes >> > > > > > > >> get >> > > > > > > >> > > > more and more over time if some users keep deleting >> and >> > > > > creating >> > > > > > > >> topics. >> > > > > > > >> > > Do >> > > > > > > >> > > > you think this can be a problem? >> > > > > > > >> > > >> > > > > > > >> > > Hi Dong, >> > > > > > > >> > > >> > > > > > > >> > > We could expire the "partition tombstones" after an >> hour >> > or >> > > > so. >> > > > > > In >> > > > > > > >> > > practice this would solve the issue for clients that >> like >> > to >> > > > > > destroy >> > > > > > > >> and >> > > > > > > >> > > re-create topics all the time. In any case, doesn't >> the >> > > > current >> > > > > > > >> proposal >> > > > > > > >> > > add per-partition znodes as well that we have to track >> > even >> > > > > after >> > > > > > the >> > > > > > > >> > > partition is deleted? Or did I misunderstand that? >> > > > > > > >> > > >> > > > > > > >> > >> > > > > > > >> > Actually the current KIP does not add per-partition >> znodes. >> > > > Could >> > > > > > you >> > > > > > > >> > double check? I can fix the KIP wiki if there is anything >> > > > > > misleading. >> > > > > > > >> >> > > > > > > >> Hi Dong, >> > > > > > > >> >> > > > > > > >> I double-checked the KIP, and I can see that you are in >> fact >> > > > using a >> > > > > > > >> global counter for initializing partition epochs. So, you >> are >> > > > > > correct, it >> > > > > > > >> doesn't add per-partition znodes for partitions that no >> longer >> > > > > exist. >> > > > > > > >> >> > > > > > > >> > >> > > > > > > >> > If we expire the "partition tomstones" after an hour, and >> > the >> > > > > topic >> > > > > > is >> > > > > > > >> > re-created after more than an hour since the topic >> deletion, >> > > > then >> > > > > > we are >> > > > > > > >> > back to the situation where user can not tell whether the >> > > topic >> > > > > has >> > > > > > been >> > > > > > > >> > re-created or not, right? >> > > > > > > >> >> > > > > > > >> Yes, with an expiration period, it would not ensure >> > > immutability-- >> > > > > you >> > > > > > > >> could effectively reuse partition names and they would look >> > the >> > > > > same. >> > > > > > > >> >> > > > > > > >> > >> > > > > > > >> > >> > > > > > > >> > > >> > > > > > > >> > > It's not really clear to me what should happen when a >> > topic >> > > is >> > > > > > > >> destroyed >> > > > > > > >> > > and re-created with new data. Should consumers >> continue >> > to >> > > be >> > > > > > able to >> > > > > > > >> > > consume? We don't know where they stopped consuming >> from >> > > the >> > > > > > previous >> > > > > > > >> > > incarnation of the topic, so messages may have been >> lost. >> > > > > > Certainly >> > > > > > > >> > > consuming data from offset X of the new incarnation of >> the >> > > > topic >> > > > > > may >> > > > > > > >> give >> > > > > > > >> > > something totally different from what you would have >> > gotten >> > > > from >> > > > > > > >> offset X >> > > > > > > >> > > of the previous incarnation of the topic. >> > > > > > > >> > > >> > > > > > > >> > >> > > > > > > >> > With the current KIP, if a consumer consumes a topic >> based >> > on >> > > > the >> > > > > > last >> > > > > > > >> > remembered (offset, partitionEpoch, leaderEpoch), and if >> the >> > > > topic >> > > > > > is >> > > > > > > >> > re-created, consume will throw >> > InvalidPartitionEpochException >> > > > > > because >> > > > > > > >> the >> > > > > > > >> > previous partitionEpoch will be different from the >> current >> > > > > > > >> partitionEpoch. >> > > > > > > >> > This is described in the Proposed Changes -> Consumption >> > after >> > > > > topic >> > > > > > > >> > deletion in the KIP. I can improve the KIP if there is >> > > anything >> > > > > not >> > > > > > > >> clear. >> > > > > > > >> >> > > > > > > >> Thanks for the clarification. It sounds like what you >> really >> > > want >> > > > > is >> > > > > > > >> immutability-- i.e., to never "really" reuse partition >> > > > identifiers. >> > > > > > And >> > > > > > > >> you do this by making the partition name no longer the >> "real" >> > > > > > identifier. >> > > > > > > >> >> > > > > > > >> My big concern about this KIP is that it seems like an >> > > > > > anti-scalability >> > > > > > > >> feature. Now we are adding 4 extra bytes for every >> partition >> > in >> > > > the >> > > > > > > >> FetchResponse and Request, for example. That could be 40 >> kb >> > per >> > > > > > request, >> > > > > > > >> if the user has 10,000 partitions. And of course, the KIP >> > also >> > > > > makes >> > > > > > > >> massive changes to UpdateMetadataRequest, MetadataResponse, >> > > > > > > >> OffsetCommitRequest, OffsetFetchResponse, >> LeaderAndIsrRequest, >> > > > > > > >> ListOffsetResponse, etc. which will also increase their >> size >> > on >> > > > the >> > > > > > wire >> > > > > > > >> and in memory. >> > > > > > > >> >> > > > > > > >> One thing that we talked a lot about in the past is >> replacing >> > > > > > partition >> > > > > > > >> names with IDs. IDs have a lot of really nice features. >> They >> > > > take >> > > > > > up much >> > > > > > > >> less space in memory than strings (especially 2-byte Java >> > > > strings). >> > > > > > They >> > > > > > > >> can often be allocated on the stack rather than the heap >> > > > (important >> > > > > > when >> > > > > > > >> you are dealing with hundreds of thousands of them). They >> can >> > > be >> > > > > > > >> efficiently deserialized and serialized. If we use 64-bit >> > ones, >> > > > we >> > > > > > will >> > > > > > > >> never run out of IDs, which means that they can always be >> > unique >> > > > per >> > > > > > > >> partition. >> > > > > > > >> >> > > > > > > >> Given that the partition name is no longer the "real" >> > identifier >> > > > for >> > > > > > > >> partitions in the current KIP-232 proposal, why not just >> move >> > to >> > > > > using >> > > > > > > >> partition IDs entirely instead of strings? You have to >> change >> > > all >> > > > > the >> > > > > > > >> messages anyway. There isn't much point any more to >> carrying >> > > > around >> > > > > > the >> > > > > > > >> partition name in every RPC, since you really need (name, >> > epoch) >> > > > to >> > > > > > > >> identify the partition. >> > > > > > > >> Probably the metadata response and a few other messages >> would >> > > have >> > > > > to >> > > > > > > >> still carry the partition name, to allow clients to go from >> > name >> > > > to >> > > > > > id. >> > > > > > > >> But we could mostly forget about the strings. And then >> this >> > > would >> > > > > be >> > > > > > a >> > > > > > > >> scalability improvement rather than a scalability problem. >> > > > > > > >> >> > > > > > > >> > >> > > > > > > >> > >> > > > > > > >> > > By choosing to reuse the same (topic, partition, >> offset) >> > > > > 3-tuple, >> > > > > > we >> > > > > > > >> have >> > > > > > > >> > >> > > > > > > >> > chosen to give up immutability. That was a really bad >> > > decision. >> > > > > > And >> > > > > > > >> now >> > > > > > > >> > > we have to worry about time dependencies, stale cached >> > data, >> > > > and >> > > > > > all >> > > > > > > >> the >> > > > > > > >> > > rest. We can't completely fix this inside Kafka no >> matter >> > > > what >> > > > > > we do, >> > > > > > > >> > > because not all that cached data is inside Kafka >> itself. >> > > Some >> > > > > of >> > > > > > it >> > > > > > > >> may be >> > > > > > > >> > > in systems that Kafka has sent data to, such as other >> > > daemons, >> > > > > SQL >> > > > > > > >> > > databases, streams, and so forth. >> > > > > > > >> > > >> > > > > > > >> > >> > > > > > > >> > The current KIP will uniquely identify a message using >> > (topic, >> > > > > > > >> partition, >> > > > > > > >> > offset, partitionEpoch) 4-tuple. This addresses the >> message >> > > > > > immutability >> > > > > > > >> > issue that you mentioned. Is there any corner case where >> the >> > > > > message >> > > > > > > >> > immutability is still not preserved with the current KIP? >> > > > > > > >> > >> > > > > > > >> > >> > > > > > > >> > > >> > > > > > > >> > > I guess the idea here is that mirror maker should work >> as >> > > > > expected >> > > > > > > >> when >> > > > > > > >> > > users destroy a topic and re-create it with the same >> name. >> > > > > That's >> > > > > > > >> kind of >> > > > > > > >> > > tough, though, since in that scenario, mirror maker >> > probably >> > > > > > should >> > > > > > > >> destroy >> > > > > > > >> > > and re-create the topic on the other end, too, right? >> > > > > Otherwise, >> > > > > > > >> what you >> > > > > > > >> > > end up with on the other end could be half of one >> > > incarnation >> > > > of >> > > > > > the >> > > > > > > >> topic, >> > > > > > > >> > > and half of another. >> > > > > > > >> > > >> > > > > > > >> > > What mirror maker really needs is to be able to follow >> a >> > > > stream >> > > > > of >> > > > > > > >> events >> > > > > > > >> > > about the kafka cluster itself. We could have some >> master >> > > > topic >> > > > > > > >> which is >> > > > > > > >> > > always present and which contains data about all topic >> > > > > deletions, >> > > > > > > >> > > creations, etc. Then MM can simply follow this topic >> and >> > do >> > > > > what >> > > > > > is >> > > > > > > >> needed. >> > > > > > > >> > > >> > > > > > > >> > > > >> > > > > > > >> > > > >> > > > > > > >> > > > > >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > Then the next question maybe, should we use a >> global >> > > > > > > >> metadataEpoch + >> > > > > > > >> > > > > > per-partition partitionEpoch, instead of using >> > > > > per-partition >> > > > > > > >> > > leaderEpoch >> > > > > > > >> > > > > + >> > > > > > > >> > > > > > per-partition leaderEpoch. The former solution >> using >> > > > > > > >> metadataEpoch >> > > > > > > >> > > would >> > > > > > > >> > > > > > not work due to the following scenario (provided >> by >> > > > Jun): >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > "Consider the following scenario. In metadata v1, >> > the >> > > > > leader >> > > > > > > >> for a >> > > > > > > >> > > > > > partition is at broker 1. In metadata v2, leader >> is >> > at >> > > > > > broker >> > > > > > > >> 2. In >> > > > > > > >> > > > > > metadata v3, leader is at broker 1 again. The >> last >> > > > > committed >> > > > > > > >> offset >> > > > > > > >> > > in >> > > > > > > >> > > > > v1, >> > > > > > > >> > > > > > v2 and v3 are 10, 20 and 30, respectively. A >> > consumer >> > > is >> > > > > > > >> started and >> > > > > > > >> > > > > reads >> > > > > > > >> > > > > > metadata v1 and reads messages from offset 0 to >> 25 >> > > from >> > > > > > broker >> > > > > > > >> 1. My >> > > > > > > >> > > > > > understanding is that in the current proposal, >> the >> > > > > metadata >> > > > > > > >> version >> > > > > > > >> > > > > > associated with offset 25 is v1. The consumer is >> > then >> > > > > > restarted >> > > > > > > >> and >> > > > > > > >> > > > > fetches >> > > > > > > >> > > > > > metadata v2. The consumer tries to read from >> broker >> > 2, >> > > > > > which is >> > > > > > > >> the >> > > > > > > >> > > old >> > > > > > > >> > > > > > leader with the last offset at 20. In this case, >> the >> > > > > > consumer >> > > > > > > >> will >> > > > > > > >> > > still >> > > > > > > >> > > > > > get OffsetOutOfRangeException incorrectly." >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > Regarding your comment "For the second purpose, >> this >> > > is >> > > > > > "soft >> > > > > > > >> state" >> > > > > > > >> > > > > > anyway. If the client thinks X is the leader >> but Y >> > is >> > > > > > really >> > > > > > > >> the >> > > > > > > >> > > leader, >> > > > > > > >> > > > > > the client will talk to X, and X will point out >> its >> > > > > mistake >> > > > > > by >> > > > > > > >> > > sending >> > > > > > > >> > > > > back >> > > > > > > >> > > > > > a NOT_LEADER_FOR_PARTITION.", it is probably no >> > true. >> > > > The >> > > > > > > >> problem >> > > > > > > >> > > here is >> > > > > > > >> > > > > > that the old leader X may still think it is the >> > leader >> > > > of >> > > > > > the >> > > > > > > >> > > partition >> > > > > > > >> > > > > and >> > > > > > > >> > > > > > thus it will not send back >> NOT_LEADER_FOR_PARTITION. >> > > The >> > > > > > reason >> > > > > > > >> is >> > > > > > > >> > > > > provided >> > > > > > > >> > > > > > in KAFKA-6262. Can you check if that makes sense? >> > > > > > > >> > > > > >> > > > > > > >> > > > > This is solvable with a timeout, right? If the >> leader >> > > > can't >> > > > > > > >> > > communicate >> > > > > > > >> > > > > with the controller for a certain period of time, >> it >> > > > should >> > > > > > stop >> > > > > > > >> > > acting as >> > > > > > > >> > > > > the leader. We have to solve this problem, >> anyway, in >> > > > order >> > > > > > to >> > > > > > > >> fix >> > > > > > > >> > > all the >> > > > > > > >> > > > > corner cases. >> > > > > > > >> > > > > >> > > > > > > >> > > > >> > > > > > > >> > > > Not sure if I fully understand your proposal. The >> > proposal >> > > > > > seems to >> > > > > > > >> > > require >> > > > > > > >> > > > non-trivial changes to our existing leadership >> election >> > > > > > mechanism. >> > > > > > > >> Could >> > > > > > > >> > > > you provide more detail regarding how it works? For >> > > example, >> > > > > how >> > > > > > > >> should >> > > > > > > >> > > > user choose this timeout, how leader determines >> whether >> > it >> > > > can >> > > > > > still >> > > > > > > >> > > > communicate with controller, and how this triggers >> > > > controller >> > > > > to >> > > > > > > >> elect >> > > > > > > >> > > new >> > > > > > > >> > > > leader? >> > > > > > > >> > > >> > > > > > > >> > > Before I come up with any proposal, let me make sure I >> > > > > understand >> > > > > > the >> > > > > > > >> > > problem correctly. My big question was, what prevents >> > > > > split-brain >> > > > > > > >> here? >> > > > > > > >> > > >> > > > > > > >> > > Let's say I have a partition which is on nodes A, B, >> and >> > C, >> > > > with >> > > > > > > >> min-ISR >> > > > > > > >> > > 2. The controller is D. At some point, there is a >> > network >> > > > > > partition >> > > > > > > >> > > between A and B and the rest of the cluster. The >> > Controller >> > > > > > > >> re-assigns the >> > > > > > > >> > > partition to nodes C, D, and E. But A and B keep >> chugging >> > > > away, >> > > > > > even >> > > > > > > >> > > though they can no longer communicate with the >> controller. >> > > > > > > >> > > >> > > > > > > >> > > At some point, a client with stale metadata writes to >> the >> > > > > > partition. >> > > > > > > >> It >> > > > > > > >> > > still thinks the partition is on node A, B, and C, so >> > that's >> > > > > > where it >> > > > > > > >> sends >> > > > > > > >> > > the data. It's unable to talk to C, but A and B reply >> > back >> > > > that >> > > > > > all >> > > > > > > >> is >> > > > > > > >> > > well. >> > > > > > > >> > > >> > > > > > > >> > > Is this not a case where we could lose data due to >> split >> > > > brain? >> > > > > > Or is >> > > > > > > >> > > there a mechanism for preventing this that I missed? >> If >> > it >> > > > is, >> > > > > it >> > > > > > > >> seems >> > > > > > > >> > > like a pretty serious failure case that we should be >> > > handling >> > > > > > with our >> > > > > > > >> > > metadata rework. And I think epoch numbers and >> timeouts >> > > might >> > > > > be >> > > > > > > >> part of >> > > > > > > >> > > the solution. >> > > > > > > >> > > >> > > > > > > >> > >> > > > > > > >> > Right, split brain can happen if RF=4 and minIsr=2. >> > However, I >> > > > am >> > > > > > not >> > > > > > > >> sure >> > > > > > > >> > it is a pretty serious issue which we need to address >> today. >> > > > This >> > > > > > can be >> > > > > > > >> > prevented by configuring the Kafka topic so that minIsr > >> > > RF/2. >> > > > > > > >> Actually, >> > > > > > > >> > if user sets minIsr=2, is there anything reason that user >> > > wants >> > > > to >> > > > > > set >> > > > > > > >> RF=4 >> > > > > > > >> > instead of 4? >> > > > > > > >> > >> > > > > > > >> > Introducing timeout in leader election mechanism is >> > > > non-trivial. I >> > > > > > > >> think we >> > > > > > > >> > probably want to do that only if there is good use-case >> that >> > > can >> > > > > not >> > > > > > > >> > otherwise be addressed with the current mechanism. >> > > > > > > >> >> > > > > > > >> I still would like to think about these corner cases more. >> > But >> > > > > > perhaps >> > > > > > > >> it's not directly related to this KIP. >> > > > > > > >> >> > > > > > > >> regards, >> > > > > > > >> Colin >> > > > > > > >> >> > > > > > > >> >> > > > > > > >> > >> > > > > > > >> > >> > > > > > > >> > > best, >> > > > > > > >> > > Colin >> > > > > > > >> > > >> > > > > > > >> > > >> > > > > > > >> > > > >> > > > > > > >> > > > >> > > > > > > >> > > > > best, >> > > > > > > >> > > > > Colin >> > > > > > > >> > > > > >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > Regards, >> > > > > > > >> > > > > > Dong >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > On Wed, Jan 24, 2018 at 10:39 AM, Colin McCabe < >> > > > > > > >> cmcc...@apache.org> >> > > > > > > >> > > > > wrote: >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > > Hi Dong, >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > Thanks for proposing this KIP. I think a >> metadata >> > > > epoch >> > > > > > is a >> > > > > > > >> > > really >> > > > > > > >> > > > > good >> > > > > > > >> > > > > > > idea. >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > I read through the DISCUSS thread, but I still >> > don't >> > > > > have >> > > > > > a >> > > > > > > >> clear >> > > > > > > >> > > > > picture >> > > > > > > >> > > > > > > of why the proposal uses a metadata epoch per >> > > > partition >> > > > > > rather >> > > > > > > >> > > than a >> > > > > > > >> > > > > > > global metadata epoch. A metadata epoch per >> > > partition >> > > > > is >> > > > > > > >> kind of >> > > > > > > >> > > > > > > unpleasant-- it's at least 4 extra bytes per >> > > partition >> > > > > > that we >> > > > > > > >> > > have to >> > > > > > > >> > > > > send >> > > > > > > >> > > > > > > over the wire in every full metadata request, >> > which >> > > > > could >> > > > > > > >> become >> > > > > > > >> > > extra >> > > > > > > >> > > > > > > kilobytes on the wire when the number of >> > partitions >> > > > > > becomes >> > > > > > > >> large. >> > > > > > > >> > > > > Plus, >> > > > > > > >> > > > > > > we have to update all the auxillary classes to >> > > include >> > > > > an >> > > > > > > >> epoch. >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > We need to have a global metadata epoch anyway >> to >> > > > handle >> > > > > > > >> partition >> > > > > > > >> > > > > > > addition and deletion. For example, if I give >> you >> > > > > > > >> > > > > > > MetadataResponse{part1,epoch 1, part2, epoch 1} >> > and >> > > > > > {part1, >> > > > > > > >> > > epoch1}, >> > > > > > > >> > > > > which >> > > > > > > >> > > > > > > MetadataResponse is newer? You have no way of >> > > > knowing. >> > > > > > It >> > > > > > > >> could >> > > > > > > >> > > be >> > > > > > > >> > > > > that >> > > > > > > >> > > > > > > part2 has just been created, and the response >> > with 2 >> > > > > > > >> partitions is >> > > > > > > >> > > > > newer. >> > > > > > > >> > > > > > > Or it coudl be that part2 has just been >> deleted, >> > and >> > > > > > > >> therefore the >> > > > > > > >> > > > > response >> > > > > > > >> > > > > > > with 1 partition is newer. You must have a >> global >> > > > epoch >> > > > > > to >> > > > > > > >> > > > > disambiguate >> > > > > > > >> > > > > > > these two cases. >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > Previously, I worked on the Ceph distributed >> > > > filesystem. >> > > > > > > >> Ceph had >> > > > > > > >> > > the >> > > > > > > >> > > > > > > concept of a map of the whole cluster, >> maintained >> > > by a >> > > > > few >> > > > > > > >> servers >> > > > > > > >> > > > > doing >> > > > > > > >> > > > > > > paxos. This map was versioned by a single >> 64-bit >> > > > epoch >> > > > > > number >> > > > > > > >> > > which >> > > > > > > >> > > > > > > increased on every change. It was propagated >> to >> > > > clients >> > > > > > > >> through >> > > > > > > >> > > > > gossip. I >> > > > > > > >> > > > > > > wonder if something similar could work here? >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > It seems like the the Kafka MetadataResponse >> > serves >> > > > two >> > > > > > > >> somewhat >> > > > > > > >> > > > > unrelated >> > > > > > > >> > > > > > > purposes. Firstly, it lets clients know what >> > > > partitions >> > > > > > > >> exist in >> > > > > > > >> > > the >> > > > > > > >> > > > > > > system and where they live. Secondly, it lets >> > > clients >> > > > > > know >> > > > > > > >> which >> > > > > > > >> > > nodes >> > > > > > > >> > > > > > > within the partition are in-sync (in the ISR) >> and >> > > > which >> > > > > > node >> > > > > > > >> is the >> > > > > > > >> > > > > leader. >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > The first purpose is what you really need a >> > metadata >> > > > > epoch >> > > > > > > >> for, I >> > > > > > > >> > > > > think. >> > > > > > > >> > > > > > > You want to know whether a partition exists or >> > not, >> > > or >> > > > > you >> > > > > > > >> want to >> > > > > > > >> > > know >> > > > > > > >> > > > > > > which nodes you should talk to in order to >> write >> > to >> > > a >> > > > > > given >> > > > > > > >> > > > > partition. A >> > > > > > > >> > > > > > > single metadata epoch for the whole response >> > should >> > > be >> > > > > > > >> adequate >> > > > > > > >> > > here. >> > > > > > > >> > > > > We >> > > > > > > >> > > > > > > should not change the partition assignment >> without >> > > > going >> > > > > > > >> through >> > > > > > > >> > > > > zookeeper >> > > > > > > >> > > > > > > (or a similar system), and this inherently >> > > serializes >> > > > > > updates >> > > > > > > >> into >> > > > > > > >> > > a >> > > > > > > >> > > > > > > numbered stream. Brokers should also stop >> > > responding >> > > > to >> > > > > > > >> requests >> > > > > > > >> > > when >> > > > > > > >> > > > > they >> > > > > > > >> > > > > > > are unable to contact ZK for a certain time >> > period. >> > > > > This >> > > > > > > >> prevents >> > > > > > > >> > > the >> > > > > > > >> > > > > case >> > > > > > > >> > > > > > > where a given partition has been moved off some >> > set >> > > of >> > > > > > nodes, >> > > > > > > >> but a >> > > > > > > >> > > > > client >> > > > > > > >> > > > > > > still ends up talking to those nodes and >> writing >> > > data >> > > > > > there. >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > For the second purpose, this is "soft state" >> > anyway. >> > > > If >> > > > > > the >> > > > > > > >> client >> > > > > > > >> > > > > thinks >> > > > > > > >> > > > > > > X is the leader but Y is really the leader, the >> > > client >> > > > > > will >> > > > > > > >> talk >> > > > > > > >> > > to X, >> > > > > > > >> > > > > and >> > > > > > > >> > > > > > > X will point out its mistake by sending back a >> > > > > > > >> > > > > NOT_LEADER_FOR_PARTITION. >> > > > > > > >> > > > > > > Then the client can update its metadata again >> and >> > > find >> > > > > > the new >> > > > > > > >> > > leader, >> > > > > > > >> > > > > if >> > > > > > > >> > > > > > > there is one. There is no need for an epoch to >> > > handle >> > > > > > this. >> > > > > > > >> > > > > Similarly, I >> > > > > > > >> > > > > > > can't think of a reason why changing the >> in-sync >> > > > replica >> > > > > > set >> > > > > > > >> needs >> > > > > > > >> > > to >> > > > > > > >> > > > > bump >> > > > > > > >> > > > > > > the epoch. >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > best, >> > > > > > > >> > > > > > > Colin >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > On Wed, Jan 24, 2018, at 09:45, Dong Lin wrote: >> > > > > > > >> > > > > > > > Thanks much for reviewing the KIP! >> > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > Dong >> > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > On Wed, Jan 24, 2018 at 7:10 AM, Guozhang >> Wang < >> > > > > > > >> > > wangg...@gmail.com> >> > > > > > > >> > > > > > > wrote: >> > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > > Yeah that makes sense, again I'm just >> making >> > > sure >> > > > we >> > > > > > > >> understand >> > > > > > > >> > > > > all the >> > > > > > > >> > > > > > > > > scenarios and what to expect. >> > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > > > > > I agree that if, more generally speaking, >> say >> > > > users >> > > > > > have >> > > > > > > >> only >> > > > > > > >> > > > > consumed >> > > > > > > >> > > > > > > to >> > > > > > > >> > > > > > > > > offset 8, and then call seek(16) to "jump" >> to >> > a >> > > > > > further >> > > > > > > >> > > position, >> > > > > > > >> > > > > then >> > > > > > > >> > > > > > > she >> > > > > > > >> > > > > > > > > needs to be aware that OORE maybe thrown >> and >> > she >> > > > > > needs to >> > > > > > > >> > > handle >> > > > > > > >> > > > > it or >> > > > > > > >> > > > > > > rely >> > > > > > > >> > > > > > > > > on reset policy which should not surprise >> her. >> > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > > > > > I'm +1 on the KIP. >> > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > > > > > Guozhang >> > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > > > > > On Wed, Jan 24, 2018 at 12:31 AM, Dong Lin >> < >> > > > > > > >> > > lindon...@gmail.com> >> > > > > > > >> > > > > > > wrote: >> > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > > > > > > Yes, in general we can not prevent >> > > > > > > >> OffsetOutOfRangeException >> > > > > > > >> > > if >> > > > > > > >> > > > > user >> > > > > > > >> > > > > > > > > seeks >> > > > > > > >> > > > > > > > > > to a wrong offset. The main goal is to >> > prevent >> > > > > > > >> > > > > > > OffsetOutOfRangeException >> > > > > > > >> > > > > > > > > if >> > > > > > > >> > > > > > > > > > user has done things in the right way, >> e.g. >> > > user >> > > > > > should >> > > > > > > >> know >> > > > > > > >> > > that >> > > > > > > >> > > > > > > there >> > > > > > > >> > > > > > > > > is >> > > > > > > >> > > > > > > > > > message with this offset. >> > > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > > > > > > For example, if user calls seek(..) right >> > > after >> > > > > > > >> > > construction, the >> > > > > > > >> > > > > > > only >> > > > > > > >> > > > > > > > > > reason I can think of is that user stores >> > > offset >> > > > > > > >> externally. >> > > > > > > >> > > In >> > > > > > > >> > > > > this >> > > > > > > >> > > > > > > > > case, >> > > > > > > >> > > > > > > > > > user currently needs to use the offset >> which >> > > is >> > > > > > obtained >> > > > > > > >> > > using >> > > > > > > >> > > > > > > > > position(..) >> > > > > > > >> > > > > > > > > > from the last run. With this KIP, user >> needs >> > > to >> > > > > get >> > > > > > the >> > > > > > > >> > > offset >> > > > > > > >> > > > > and >> > > > > > > >> > > > > > > the >> > > > > > > >> > > > > > > > > > offsetEpoch using >> > positionAndOffsetEpoch(...) >> > > > and >> > > > > > stores >> > > > > > > >> > > these >> > > > > > > >> > > > > > > > > information >> > > > > > > >> > > > > > > > > > externally. The next time user starts >> > > consumer, >> > > > > > he/she >> > > > > > > >> needs >> > > > > > > >> > > to >> > > > > > > >> > > > > call >> > > > > > > >> > > > > > > > > > seek(..., offset, offsetEpoch) right >> after >> > > > > > construction. >> > > > > > > >> > > Then KIP >> > > > > > > >> > > > > > > should >> > > > > > > >> > > > > > > > > be >> > > > > > > >> > > > > > > > > > able to ensure that we don't throw >> > > > > > > >> OffsetOutOfRangeException >> > > > > > > >> > > if >> > > > > > > >> > > > > > > there is >> > > > > > > >> > > > > > > > > no >> > > > > > > >> > > > > > > > > > unclean leader election. >> > > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > > > > > > Does this sound OK? >> > > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > > > > > > Regards, >> > > > > > > >> > > > > > > > > > Dong >> > > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > > > > > > On Tue, Jan 23, 2018 at 11:44 PM, >> Guozhang >> > > Wang >> > > > < >> > > > > > > >> > > > > wangg...@gmail.com> >> > > > > > > >> > > > > > > > > > wrote: >> > > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > > > > > > > "If consumer wants to consume message >> with >> > > > > offset >> > > > > > 16, >> > > > > > > >> then >> > > > > > > >> > > > > consumer >> > > > > > > >> > > > > > > > > must >> > > > > > > >> > > > > > > > > > > have >> > > > > > > >> > > > > > > > > > > already fetched message with offset 15" >> > > > > > > >> > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > --> this may not be always true right? >> > What >> > > if >> > > > > > > >> consumer >> > > > > > > >> > > just >> > > > > > > >> > > > > call >> > > > > > > >> > > > > > > > > > seek(16) >> > > > > > > >> > > > > > > > > > > after construction and then poll >> without >> > > > > committed >> > > > > > > >> offset >> > > > > > > >> > > ever >> > > > > > > >> > > > > > > stored >> > > > > > > >> > > > > > > > > > > before? Admittedly it is rare but we do >> > not >> > > > > > > >> programmably >> > > > > > > >> > > > > disallow >> > > > > > > >> > > > > > > it. >> > > > > > > >> > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > Guozhang >> > > > > > > >> > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > On Tue, Jan 23, 2018 at 10:42 PM, Dong >> > Lin < >> > > > > > > >> > > > > lindon...@gmail.com> >> > > > > > > >> > > > > > > > > wrote: >> > > > > > > >> > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > Hey Guozhang, >> > > > > > > >> > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > Thanks much for reviewing the KIP! >> > > > > > > >> > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > In the scenario you described, let's >> > > assume >> > > > > that >> > > > > > > >> broker >> > > > > > > >> > > A has >> > > > > > > >> > > > > > > > > messages >> > > > > > > >> > > > > > > > > > > with >> > > > > > > >> > > > > > > > > > > > offset up to 10, and broker B has >> > messages >> > > > > with >> > > > > > > >> offset >> > > > > > > >> > > up to >> > > > > > > >> > > > > 20. >> > > > > > > >> > > > > > > If >> > > > > > > >> > > > > > > > > > > > consumer wants to consume message >> with >> > > > offset >> > > > > > 9, it >> > > > > > > >> will >> > > > > > > >> > > not >> > > > > > > >> > > > > > > receive >> > > > > > > >> > > > > > > > > > > > OffsetOutOfRangeException >> > > > > > > >> > > > > > > > > > > > from broker A. >> > > > > > > >> > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > If consumer wants to consume message >> > with >> > > > > offset >> > > > > > > >> 16, then >> > > > > > > >> > > > > > > consumer >> > > > > > > >> > > > > > > > > must >> > > > > > > >> > > > > > > > > > > > have already fetched message with >> offset >> > > 15, >> > > > > > which >> > > > > > > >> can >> > > > > > > >> > > only >> > > > > > > >> > > > > come >> > > > > > > >> > > > > > > from >> > > > > > > >> > > > > > > > > > > > broker B. Because consumer will fetch >> > from >> > > > > > broker B >> > > > > > > >> only >> > > > > > > >> > > if >> > > > > > > >> > > > > > > > > leaderEpoch >> > > > > > > >> > > > > > > > > > > >= >> > > > > > > >> > > > > > > > > > > > 2, then the current consumer >> leaderEpoch >> > > can >> > > > > > not be >> > > > > > > >> 1 >> > > > > > > >> > > since >> > > > > > > >> > > > > this >> > > > > > > >> > > > > > > KIP >> > > > > > > >> > > > > > > > > > > > prevents leaderEpoch rewind. Thus we >> > will >> > > > not >> > > > > > have >> > > > > > > >> > > > > > > > > > > > OffsetOutOfRangeException >> > > > > > > >> > > > > > > > > > > > in this case. >> > > > > > > >> > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > Does this address your question, or >> > maybe >> > > > > there >> > > > > > is >> > > > > > > >> more >> > > > > > > >> > > > > advanced >> > > > > > > >> > > > > > > > > > scenario >> > > > > > > >> > > > > > > > > > > > that the KIP does not handle? >> > > > > > > >> > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > Thanks, >> > > > > > > >> > > > > > > > > > > > Dong >> > > > > > > >> > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > On Tue, Jan 23, 2018 at 9:43 PM, >> > Guozhang >> > > > > Wang < >> > > > > > > >> > > > > > > wangg...@gmail.com> >> > > > > > > >> > > > > > > > > > > wrote: >> > > > > > > >> > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > Thanks Dong, I made a pass over the >> > wiki >> > > > and >> > > > > > it >> > > > > > > >> lgtm. >> > > > > > > >> > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > Just a quick question: can we >> > completely >> > > > > > > >> eliminate the >> > > > > > > >> > > > > > > > > > > > > OffsetOutOfRangeException with this >> > > > > approach? >> > > > > > Say >> > > > > > > >> if >> > > > > > > >> > > there >> > > > > > > >> > > > > is >> > > > > > > >> > > > > > > > > > > consecutive >> > > > > > > >> > > > > > > > > > > > > leader changes such that the cached >> > > > > metadata's >> > > > > > > >> > > partition >> > > > > > > >> > > > > epoch >> > > > > > > >> > > > > > > is >> > > > > > > >> > > > > > > > > 1, >> > > > > > > >> > > > > > > > > > > and >> > > > > > > >> > > > > > > > > > > > > the metadata fetch response returns >> > > with >> > > > > > > >> partition >> > > > > > > >> > > epoch 2 >> > > > > > > >> > > > > > > > > pointing >> > > > > > > >> > > > > > > > > > to >> > > > > > > >> > > > > > > > > > > > > leader broker A, while the actual >> > > > up-to-date >> > > > > > > >> metadata >> > > > > > > >> > > has >> > > > > > > >> > > > > > > partition >> > > > > > > >> > > > > > > > > > > > epoch 3 >> > > > > > > >> > > > > > > > > > > > > whose leader is now broker B, the >> > > metadata >> > > > > > > >> refresh will >> > > > > > > >> > > > > still >> > > > > > > >> > > > > > > > > succeed >> > > > > > > >> > > > > > > > > > > and >> > > > > > > >> > > > > > > > > > > > > the follow-up fetch request may >> still >> > > see >> > > > > > OORE? >> > > > > > > >> > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > Guozhang >> > > > > > > >> > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > On Tue, Jan 23, 2018 at 3:47 PM, >> Dong >> > > Lin >> > > > < >> > > > > > > >> > > > > lindon...@gmail.com >> > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > > > wrote: >> > > > > > > >> > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > Hi all, >> > > > > > > >> > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > I would like to start the voting >> > > process >> > > > > for >> > > > > > > >> KIP-232: >> > > > > > > >> > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > https://cwiki.apache.org/ >> > > > > > > >> > > confluence/display/KAFKA/KIP- >> > > > > > > >> > > > > > > > > > > > > > 232%3A+Detect+outdated+metadat >> > > > > > > >> a+using+leaderEpoch+ >> > > > > > > >> > > > > > > > > > and+partitionEpoch >> > > > > > > >> > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > The KIP will help fix a >> concurrency >> > > > issue >> > > > > in >> > > > > > > >> Kafka >> > > > > > > >> > > which >> > > > > > > >> > > > > > > > > currently >> > > > > > > >> > > > > > > > > > > can >> > > > > > > >> > > > > > > > > > > > > > cause message loss or message >> > > > duplication >> > > > > in >> > > > > > > >> > > consumer. >> > > > > > > >> > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > Regards, >> > > > > > > >> > > > > > > > > > > > > > Dong >> > > > > > > >> > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > -- >> > > > > > > >> > > > > > > > > > > > > -- Guozhang >> > > > > > > >> > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > -- >> > > > > > > >> > > > > > > > > > > -- Guozhang >> > > > > > > >> > > > > > > > > > > >> > > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > > > > > -- >> > > > > > > >> > > > > > > > > -- Guozhang >> > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > >> > > > > > > >> > > >> > > > > > > >> >> > > > > > > > >> > > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> > >