Hi Becket, I would argue that using IDs for partitions is not a performance improvement, but actually a completely different way of accomplishing what this KIP is trying to solve. If you give partitions globally unique IDs, and use a different ID when re-creating a topic partition, you don't need a partition epoch to distinguish between the first incarnation of a topic. I think there are some advantages and disadvantages to both approaches.
One thing that isn't clear to me is whether a 32-bit number would be enough for a partition epoch. If I understand correctly, the partition epoch for a newly created partition is taken from a global counter maintained in ZK. So I would expect a 32 or 31 bit counter to potentially wrap around if there are a lot of partition creations and deletions. best, Colin On Mon, Feb 5, 2018, at 14:43, Becket Qin wrote: > +1 on the KIP. > > I think the KIP is mainly about adding the capability of tracking the > system state change lineage. It does not seem necessary to bundle this KIP > with replacing the topic partition with partition epoch in produce/fetch. > Replacing topic-partition string with partition epoch is essentially a > performance improvement on top of this KIP. That can probably be done > separately. > > Thanks, > > Jiangjie (Becket) Qin > > On Mon, Jan 29, 2018 at 11:52 AM, Dong Lin <lindon...@gmail.com> wrote: > > > Hey Colin, > > > > On Mon, Jan 29, 2018 at 11:23 AM, Colin McCabe <cmcc...@apache.org> wrote: > > > > > > On Mon, Jan 29, 2018 at 10:35 AM, Dong Lin <lindon...@gmail.com> > > wrote: > > > > > > > > > Hey Colin, > > > > > > > > > > I understand that the KIP will adds overhead by introducing > > > per-partition > > > > > partitionEpoch. I am open to alternative solutions that does not > > incur > > > > > additional overhead. But I don't see a better way now. > > > > > > > > > > IMO the overhead in the FetchResponse may not be that much. We > > probably > > > > > should discuss the percentage increase rather than the absolute > > number > > > > > increase. Currently after KIP-227, per-partition header has 23 bytes. > > > This > > > > > KIP adds another 4 bytes. Assume the records size is 10KB, the > > > percentage > > > > > increase is 4 / (23 + 10000) = 0.03%. It seems negligible, right? > > > > > > Hi Dong, > > > > > > Thanks for the response. I agree that the FetchRequest / FetchResponse > > > overhead should be OK, now that we have incremental fetch requests and > > > responses. However, there are a lot of cases where the percentage > > increase > > > is much greater. For example, if a client is doing full > > MetadataRequests / > > > Responses, we have some math kind of like this per partition: > > > > > > > UpdateMetadataRequestPartitionState => topic partition > > controller_epoch > > > leader leader_epoch partition_epoch isr zk_version replicas > > > offline_replicas > > > > 14 bytes: topic => string (assuming about 10 byte topic names) > > > > 4 bytes: partition => int32 > > > > 4 bytes: conroller_epoch => int32 > > > > 4 bytes: leader => int32 > > > > 4 bytes: leader_epoch => int32 > > > > +4 EXTRA bytes: partition_epoch => int32 <-- NEW > > > > 2+4+4+4 bytes: isr => [int32] (assuming 3 in the ISR) > > > > 4 bytes: zk_version => int32 > > > > 2+4+4+4 bytes: replicas => [int32] (assuming 3 replicas) > > > > 2 offline_replicas => [int32] (assuming no offline replicas) > > > > > > Assuming I added that up correctly, the per-partition overhead goes from > > > 64 bytes per partition to 68, a 6.2% increase. > > > > > > We could do similar math for a lot of the other RPCs. And you will have > > a > > > similar memory and garbage collection impact on the brokers since you > > have > > > to store all this extra state as well. > > > > > > > That is correct. IMO the Metadata is only updated periodically and is > > probably not a big deal if we increase it by 6%. The FetchResponse and > > ProduceRequest are probably the only requests that are bounded by the > > bandwidth throughput. > > > > > > > > > > > > > > > > > I agree that we can probably save more space by using partition ID so > > > that > > > > > we no longer needs the string topic name. The similar idea has also > > > been > > > > > put in the Rejected Alternative section in KIP-227. While this idea > > is > > > > > promising, it seems orthogonal to the goal of this KIP. Given that > > > there is > > > > > already many work to do in this KIP, maybe we can do the partition ID > > > in a > > > > > separate KIP? > > > > > > I guess my thinking is that the goal here is to replace an identifier > > > which can be re-used (the tuple of topic name, partition ID) with an > > > identifier that cannot be re-used (the tuple of topic name, partition ID, > > > partition epoch) in order to gain better semantics. As long as we are > > > replacing the identifier, why not replace it with an identifier that has > > > important performance advantages? The KIP freeze for the next release > > has > > > already passed, so there is time to do this. > > > > > > > In general it can be easier for discussion and implementation if we can > > split a larger task into smaller and independent tasks. For example, > > KIP-112 and KIP-113 both deals with the JBOD support. KIP-31, KIP-32 and > > KIP-33 are about timestamp support. The option on this can be subject > > though. > > > > IMO the change to switch from (topic, partition ID) to partitionEpch in all > > request/response requires us to going through all request one by one. It > > may not be hard but it can be time consuming and tedious. At high level the > > goal and the change for that will be orthogonal to the changes required in > > this KIP. That is the main reason I think we can split them into two KIPs. > > > > > > > On Mon, Jan 29, 2018, at 10:54, Dong Lin wrote: > > > > I think it is possible to move to entirely use partitionEpoch instead > > of > > > > (topic, partition) to identify a partition. Client can obtain the > > > > partitionEpoch -> (topic, partition) mapping from MetadataResponse. We > > > > probably need to figure out a way to assign partitionEpoch to existing > > > > partitions in the cluster. But this should be doable. > > > > > > > > This is a good idea. I think it will save us some space in the > > > > request/response. The actual space saving in percentage probably > > depends > > > on > > > > the amount of data and the number of partitions of the same topic. I > > just > > > > think we can do it in a separate KIP. > > > > > > Hmm. How much extra work would be required? It seems like we are > > already > > > changing almost every RPC that involves topics and partitions, already > > > adding new per-partition state to ZooKeeper, already changing how clients > > > interact with partitions. Is there some other big piece of work we'd > > have > > > to do to move to partition IDs that we wouldn't need for partition > > epochs? > > > I guess we'd have to find a way to support regular expression-based topic > > > subscriptions. If we split this into multiple KIPs, wouldn't we end up > > > changing all that RPCs and ZK state a second time? Also, I'm curious if > > > anyone has done any proof of concept GC, memory, and network usage > > > measurements on switching topic names for topic IDs. > > > > > > > > > We will need to go over all requests/responses to check how to replace > > (topic, partition ID) with partition epoch. It requires non-trivial work > > and could take time. As you mentioned, we may want to see how much saving > > we can get by switching from topic names to partition epoch. That itself > > requires time and experiment. It seems that the new idea does not rollback > > any change proposed in this KIP. So I am not sure we can get much by > > putting them into the same KIP. > > > > Anyway, if more people are interested in seeing the new idea in the same > > KIP, I can try that. > > > > > > > > > > > > best, > > > Colin > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Jan 29, 2018 at 10:18 AM, Colin McCabe <cmcc...@apache.org> > > > wrote: > > > > > > > > > >> On Fri, Jan 26, 2018, at 12:17, Dong Lin wrote: > > > > >> > Hey Colin, > > > > >> > > > > > >> > > > > > >> > On Fri, Jan 26, 2018 at 10:16 AM, Colin McCabe < > > cmcc...@apache.org> > > > > >> wrote: > > > > >> > > > > > >> > > On Thu, Jan 25, 2018, at 16:47, Dong Lin wrote: > > > > >> > > > Hey Colin, > > > > >> > > > > > > > >> > > > Thanks for the comment. > > > > >> > > > > > > > >> > > > On Thu, Jan 25, 2018 at 4:15 PM, Colin McCabe < > > > cmcc...@apache.org> > > > > >> > > wrote: > > > > >> > > > > > > > >> > > > > On Wed, Jan 24, 2018, at 21:07, Dong Lin wrote: > > > > >> > > > > > Hey Colin, > > > > >> > > > > > > > > > >> > > > > > Thanks for reviewing the KIP. > > > > >> > > > > > > > > > >> > > > > > If I understand you right, you maybe suggesting that we > > can > > > use > > > > >> a > > > > >> > > global > > > > >> > > > > > metadataEpoch that is incremented every time controller > > > updates > > > > >> > > metadata. > > > > >> > > > > > The problem with this solution is that, if a topic is > > > deleted > > > > >> and > > > > >> > > created > > > > >> > > > > > again, user will not know whether that the offset which is > > > > >> stored > > > > >> > > before > > > > >> > > > > > the topic deletion is no longer valid. This motivates the > > > idea > > > > >> to > > > > >> > > include > > > > >> > > > > > per-partition partitionEpoch. Does this sound reasonable? > > > > >> > > > > > > > > >> > > > > Hi Dong, > > > > >> > > > > > > > > >> > > > > Perhaps we can store the last valid offset of each deleted > > > topic > > > > >> in > > > > >> > > > > ZooKeeper. Then, when a topic with one of those names gets > > > > >> > > re-created, we > > > > >> > > > > can start the topic at the previous end offset rather than > > at > > > 0. > > > > >> This > > > > >> > > > > preserves immutability. It is no more burdensome than > > having > > > to > > > > >> > > preserve a > > > > >> > > > > "last epoch" for the deleted partition somewhere, right? > > > > >> > > > > > > > > >> > > > > > > > >> > > > My concern with this solution is that the number of zookeeper > > > nodes > > > > >> get > > > > >> > > > more and more over time if some users keep deleting and > > creating > > > > >> topics. > > > > >> > > Do > > > > >> > > > you think this can be a problem? > > > > >> > > > > > > >> > > Hi Dong, > > > > >> > > > > > > >> > > We could expire the "partition tombstones" after an hour or so. > > > In > > > > >> > > practice this would solve the issue for clients that like to > > > destroy > > > > >> and > > > > >> > > re-create topics all the time. In any case, doesn't the current > > > > >> proposal > > > > >> > > add per-partition znodes as well that we have to track even > > after > > > the > > > > >> > > partition is deleted? Or did I misunderstand that? > > > > >> > > > > > > >> > > > > > >> > Actually the current KIP does not add per-partition znodes. Could > > > you > > > > >> > double check? I can fix the KIP wiki if there is anything > > > misleading. > > > > >> > > > > >> Hi Dong, > > > > >> > > > > >> I double-checked the KIP, and I can see that you are in fact using a > > > > >> global counter for initializing partition epochs. So, you are > > > correct, it > > > > >> doesn't add per-partition znodes for partitions that no longer > > exist. > > > > >> > > > > >> > > > > > >> > If we expire the "partition tomstones" after an hour, and the > > topic > > > is > > > > >> > re-created after more than an hour since the topic deletion, then > > > we are > > > > >> > back to the situation where user can not tell whether the topic > > has > > > been > > > > >> > re-created or not, right? > > > > >> > > > > >> Yes, with an expiration period, it would not ensure immutability-- > > you > > > > >> could effectively reuse partition names and they would look the > > same. > > > > >> > > > > >> > > > > > >> > > > > > >> > > > > > > >> > > It's not really clear to me what should happen when a topic is > > > > >> destroyed > > > > >> > > and re-created with new data. Should consumers continue to be > > > able to > > > > >> > > consume? We don't know where they stopped consuming from the > > > previous > > > > >> > > incarnation of the topic, so messages may have been lost. > > > Certainly > > > > >> > > consuming data from offset X of the new incarnation of the topic > > > may > > > > >> give > > > > >> > > something totally different from what you would have gotten from > > > > >> offset X > > > > >> > > of the previous incarnation of the topic. > > > > >> > > > > > > >> > > > > > >> > With the current KIP, if a consumer consumes a topic based on the > > > last > > > > >> > remembered (offset, partitionEpoch, leaderEpoch), and if the topic > > > is > > > > >> > re-created, consume will throw InvalidPartitionEpochException > > > because > > > > >> the > > > > >> > previous partitionEpoch will be different from the current > > > > >> partitionEpoch. > > > > >> > This is described in the Proposed Changes -> Consumption after > > topic > > > > >> > deletion in the KIP. I can improve the KIP if there is anything > > not > > > > >> clear. > > > > >> > > > > >> Thanks for the clarification. It sounds like what you really want > > is > > > > >> immutability-- i.e., to never "really" reuse partition identifiers. > > > And > > > > >> you do this by making the partition name no longer the "real" > > > identifier. > > > > >> > > > > >> My big concern about this KIP is that it seems like an > > > anti-scalability > > > > >> feature. Now we are adding 4 extra bytes for every partition in the > > > > >> FetchResponse and Request, for example. That could be 40 kb per > > > request, > > > > >> if the user has 10,000 partitions. And of course, the KIP also > > makes > > > > >> massive changes to UpdateMetadataRequest, MetadataResponse, > > > > >> OffsetCommitRequest, OffsetFetchResponse, LeaderAndIsrRequest, > > > > >> ListOffsetResponse, etc. which will also increase their size on the > > > wire > > > > >> and in memory. > > > > >> > > > > >> One thing that we talked a lot about in the past is replacing > > > partition > > > > >> names with IDs. IDs have a lot of really nice features. They take > > > up much > > > > >> less space in memory than strings (especially 2-byte Java strings). > > > They > > > > >> can often be allocated on the stack rather than the heap (important > > > when > > > > >> you are dealing with hundreds of thousands of them). They can be > > > > >> efficiently deserialized and serialized. If we use 64-bit ones, we > > > will > > > > >> never run out of IDs, which means that they can always be unique per > > > > >> partition. > > > > >> > > > > >> Given that the partition name is no longer the "real" identifier for > > > > >> partitions in the current KIP-232 proposal, why not just move to > > using > > > > >> partition IDs entirely instead of strings? You have to change all > > the > > > > >> messages anyway. There isn't much point any more to carrying around > > > the > > > > >> partition name in every RPC, since you really need (name, epoch) to > > > > >> identify the partition. > > > > >> Probably the metadata response and a few other messages would have > > to > > > > >> still carry the partition name, to allow clients to go from name to > > > id. > > > > >> But we could mostly forget about the strings. And then this would > > be > > > a > > > > >> scalability improvement rather than a scalability problem. > > > > >> > > > > >> > > > > > >> > > > > > >> > > By choosing to reuse the same (topic, partition, offset) > > 3-tuple, > > > we > > > > >> have > > > > >> > > > > > >> > chosen to give up immutability. That was a really bad decision. > > > And > > > > >> now > > > > >> > > we have to worry about time dependencies, stale cached data, and > > > all > > > > >> the > > > > >> > > rest. We can't completely fix this inside Kafka no matter what > > > we do, > > > > >> > > because not all that cached data is inside Kafka itself. Some > > of > > > it > > > > >> may be > > > > >> > > in systems that Kafka has sent data to, such as other daemons, > > SQL > > > > >> > > databases, streams, and so forth. > > > > >> > > > > > > >> > > > > > >> > The current KIP will uniquely identify a message using (topic, > > > > >> partition, > > > > >> > offset, partitionEpoch) 4-tuple. This addresses the message > > > immutability > > > > >> > issue that you mentioned. Is there any corner case where the > > message > > > > >> > immutability is still not preserved with the current KIP? > > > > >> > > > > > >> > > > > > >> > > > > > > >> > > I guess the idea here is that mirror maker should work as > > expected > > > > >> when > > > > >> > > users destroy a topic and re-create it with the same name. > > That's > > > > >> kind of > > > > >> > > tough, though, since in that scenario, mirror maker probably > > > should > > > > >> destroy > > > > >> > > and re-create the topic on the other end, too, right? > > Otherwise, > > > > >> what you > > > > >> > > end up with on the other end could be half of one incarnation of > > > the > > > > >> topic, > > > > >> > > and half of another. > > > > >> > > > > > > >> > > What mirror maker really needs is to be able to follow a stream > > of > > > > >> events > > > > >> > > about the kafka cluster itself. We could have some master topic > > > > >> which is > > > > >> > > always present and which contains data about all topic > > deletions, > > > > >> > > creations, etc. Then MM can simply follow this topic and do > > what > > > is > > > > >> needed. > > > > >> > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > > >> > > > > > > > > > >> > > > > > Then the next question maybe, should we use a global > > > > >> metadataEpoch + > > > > >> > > > > > per-partition partitionEpoch, instead of using > > per-partition > > > > >> > > leaderEpoch > > > > >> > > > > + > > > > >> > > > > > per-partition leaderEpoch. The former solution using > > > > >> metadataEpoch > > > > >> > > would > > > > >> > > > > > not work due to the following scenario (provided by Jun): > > > > >> > > > > > > > > > >> > > > > > "Consider the following scenario. In metadata v1, the > > leader > > > > >> for a > > > > >> > > > > > partition is at broker 1. In metadata v2, leader is at > > > broker > > > > >> 2. In > > > > >> > > > > > metadata v3, leader is at broker 1 again. The last > > committed > > > > >> offset > > > > >> > > in > > > > >> > > > > v1, > > > > >> > > > > > v2 and v3 are 10, 20 and 30, respectively. A consumer is > > > > >> started and > > > > >> > > > > reads > > > > >> > > > > > metadata v1 and reads messages from offset 0 to 25 from > > > broker > > > > >> 1. My > > > > >> > > > > > understanding is that in the current proposal, the > > metadata > > > > >> version > > > > >> > > > > > associated with offset 25 is v1. The consumer is then > > > restarted > > > > >> and > > > > >> > > > > fetches > > > > >> > > > > > metadata v2. The consumer tries to read from broker 2, > > > which is > > > > >> the > > > > >> > > old > > > > >> > > > > > leader with the last offset at 20. In this case, the > > > consumer > > > > >> will > > > > >> > > still > > > > >> > > > > > get OffsetOutOfRangeException incorrectly." > > > > >> > > > > > > > > > >> > > > > > Regarding your comment "For the second purpose, this is > > > "soft > > > > >> state" > > > > >> > > > > > anyway. If the client thinks X is the leader but Y is > > > really > > > > >> the > > > > >> > > leader, > > > > >> > > > > > the client will talk to X, and X will point out its > > mistake > > > by > > > > >> > > sending > > > > >> > > > > back > > > > >> > > > > > a NOT_LEADER_FOR_PARTITION.", it is probably no true. The > > > > >> problem > > > > >> > > here is > > > > >> > > > > > that the old leader X may still think it is the leader of > > > the > > > > >> > > partition > > > > >> > > > > and > > > > >> > > > > > thus it will not send back NOT_LEADER_FOR_PARTITION. The > > > reason > > > > >> is > > > > >> > > > > provided > > > > >> > > > > > in KAFKA-6262. Can you check if that makes sense? > > > > >> > > > > > > > > >> > > > > This is solvable with a timeout, right? If the leader can't > > > > >> > > communicate > > > > >> > > > > with the controller for a certain period of time, it should > > > stop > > > > >> > > acting as > > > > >> > > > > the leader. We have to solve this problem, anyway, in order > > > to > > > > >> fix > > > > >> > > all the > > > > >> > > > > corner cases. > > > > >> > > > > > > > > >> > > > > > > > >> > > > Not sure if I fully understand your proposal. The proposal > > > seems to > > > > >> > > require > > > > >> > > > non-trivial changes to our existing leadership election > > > mechanism. > > > > >> Could > > > > >> > > > you provide more detail regarding how it works? For example, > > how > > > > >> should > > > > >> > > > user choose this timeout, how leader determines whether it can > > > still > > > > >> > > > communicate with controller, and how this triggers controller > > to > > > > >> elect > > > > >> > > new > > > > >> > > > leader? > > > > >> > > > > > > >> > > Before I come up with any proposal, let me make sure I > > understand > > > the > > > > >> > > problem correctly. My big question was, what prevents > > split-brain > > > > >> here? > > > > >> > > > > > > >> > > Let's say I have a partition which is on nodes A, B, and C, with > > > > >> min-ISR > > > > >> > > 2. The controller is D. At some point, there is a network > > > partition > > > > >> > > between A and B and the rest of the cluster. The Controller > > > > >> re-assigns the > > > > >> > > partition to nodes C, D, and E. But A and B keep chugging away, > > > even > > > > >> > > though they can no longer communicate with the controller. > > > > >> > > > > > > >> > > At some point, a client with stale metadata writes to the > > > partition. > > > > >> It > > > > >> > > still thinks the partition is on node A, B, and C, so that's > > > where it > > > > >> sends > > > > >> > > the data. It's unable to talk to C, but A and B reply back that > > > all > > > > >> is > > > > >> > > well. > > > > >> > > > > > > >> > > Is this not a case where we could lose data due to split brain? > > > Or is > > > > >> > > there a mechanism for preventing this that I missed? If it is, > > it > > > > >> seems > > > > >> > > like a pretty serious failure case that we should be handling > > > with our > > > > >> > > metadata rework. And I think epoch numbers and timeouts might > > be > > > > >> part of > > > > >> > > the solution. > > > > >> > > > > > > >> > > > > > >> > Right, split brain can happen if RF=4 and minIsr=2. However, I am > > > not > > > > >> sure > > > > >> > it is a pretty serious issue which we need to address today. This > > > can be > > > > >> > prevented by configuring the Kafka topic so that minIsr > RF/2. > > > > >> Actually, > > > > >> > if user sets minIsr=2, is there anything reason that user wants to > > > set > > > > >> RF=4 > > > > >> > instead of 4? > > > > >> > > > > > >> > Introducing timeout in leader election mechanism is non-trivial. I > > > > >> think we > > > > >> > probably want to do that only if there is good use-case that can > > not > > > > >> > otherwise be addressed with the current mechanism. > > > > >> > > > > >> I still would like to think about these corner cases more. But > > > perhaps > > > > >> it's not directly related to this KIP. > > > > >> > > > > >> regards, > > > > >> Colin > > > > >> > > > > >> > > > > >> > > > > > >> > > > > > >> > > best, > > > > >> > > Colin > > > > >> > > > > > > >> > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > best, > > > > >> > > > > Colin > > > > >> > > > > > > > > >> > > > > > > > > > >> > > > > > Regards, > > > > >> > > > > > Dong > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > On Wed, Jan 24, 2018 at 10:39 AM, Colin McCabe < > > > > >> cmcc...@apache.org> > > > > >> > > > > wrote: > > > > >> > > > > > > > > > >> > > > > > > Hi Dong, > > > > >> > > > > > > > > > > >> > > > > > > Thanks for proposing this KIP. I think a metadata epoch > > > is a > > > > >> > > really > > > > >> > > > > good > > > > >> > > > > > > idea. > > > > >> > > > > > > > > > > >> > > > > > > I read through the DISCUSS thread, but I still don't > > have > > > a > > > > >> clear > > > > >> > > > > picture > > > > >> > > > > > > of why the proposal uses a metadata epoch per partition > > > rather > > > > >> > > than a > > > > >> > > > > > > global metadata epoch. A metadata epoch per partition > > is > > > > >> kind of > > > > >> > > > > > > unpleasant-- it's at least 4 extra bytes per partition > > > that we > > > > >> > > have to > > > > >> > > > > send > > > > >> > > > > > > over the wire in every full metadata request, which > > could > > > > >> become > > > > >> > > extra > > > > >> > > > > > > kilobytes on the wire when the number of partitions > > > becomes > > > > >> large. > > > > >> > > > > Plus, > > > > >> > > > > > > we have to update all the auxillary classes to include > > an > > > > >> epoch. > > > > >> > > > > > > > > > > >> > > > > > > We need to have a global metadata epoch anyway to handle > > > > >> partition > > > > >> > > > > > > addition and deletion. For example, if I give you > > > > >> > > > > > > MetadataResponse{part1,epoch 1, part2, epoch 1} and > > > {part1, > > > > >> > > epoch1}, > > > > >> > > > > which > > > > >> > > > > > > MetadataResponse is newer? You have no way of knowing. > > > It > > > > >> could > > > > >> > > be > > > > >> > > > > that > > > > >> > > > > > > part2 has just been created, and the response with 2 > > > > >> partitions is > > > > >> > > > > newer. > > > > >> > > > > > > Or it coudl be that part2 has just been deleted, and > > > > >> therefore the > > > > >> > > > > response > > > > >> > > > > > > with 1 partition is newer. You must have a global epoch > > > to > > > > >> > > > > disambiguate > > > > >> > > > > > > these two cases. > > > > >> > > > > > > > > > > >> > > > > > > Previously, I worked on the Ceph distributed filesystem. > > > > >> Ceph had > > > > >> > > the > > > > >> > > > > > > concept of a map of the whole cluster, maintained by a > > few > > > > >> servers > > > > >> > > > > doing > > > > >> > > > > > > paxos. This map was versioned by a single 64-bit epoch > > > number > > > > >> > > which > > > > >> > > > > > > increased on every change. It was propagated to clients > > > > >> through > > > > >> > > > > gossip. I > > > > >> > > > > > > wonder if something similar could work here? > > > > >> > > > > > > > > > > >> > > > > > > It seems like the the Kafka MetadataResponse serves two > > > > >> somewhat > > > > >> > > > > unrelated > > > > >> > > > > > > purposes. Firstly, it lets clients know what partitions > > > > >> exist in > > > > >> > > the > > > > >> > > > > > > system and where they live. Secondly, it lets clients > > > know > > > > >> which > > > > >> > > nodes > > > > >> > > > > > > within the partition are in-sync (in the ISR) and which > > > node > > > > >> is the > > > > >> > > > > leader. > > > > >> > > > > > > > > > > >> > > > > > > The first purpose is what you really need a metadata > > epoch > > > > >> for, I > > > > >> > > > > think. > > > > >> > > > > > > You want to know whether a partition exists or not, or > > you > > > > >> want to > > > > >> > > know > > > > >> > > > > > > which nodes you should talk to in order to write to a > > > given > > > > >> > > > > partition. A > > > > >> > > > > > > single metadata epoch for the whole response should be > > > > >> adequate > > > > >> > > here. > > > > >> > > > > We > > > > >> > > > > > > should not change the partition assignment without going > > > > >> through > > > > >> > > > > zookeeper > > > > >> > > > > > > (or a similar system), and this inherently serializes > > > updates > > > > >> into > > > > >> > > a > > > > >> > > > > > > numbered stream. Brokers should also stop responding to > > > > >> requests > > > > >> > > when > > > > >> > > > > they > > > > >> > > > > > > are unable to contact ZK for a certain time period. > > This > > > > >> prevents > > > > >> > > the > > > > >> > > > > case > > > > >> > > > > > > where a given partition has been moved off some set of > > > nodes, > > > > >> but a > > > > >> > > > > client > > > > >> > > > > > > still ends up talking to those nodes and writing data > > > there. > > > > >> > > > > > > > > > > >> > > > > > > For the second purpose, this is "soft state" anyway. If > > > the > > > > >> client > > > > >> > > > > thinks > > > > >> > > > > > > X is the leader but Y is really the leader, the client > > > will > > > > >> talk > > > > >> > > to X, > > > > >> > > > > and > > > > >> > > > > > > X will point out its mistake by sending back a > > > > >> > > > > NOT_LEADER_FOR_PARTITION. > > > > >> > > > > > > Then the client can update its metadata again and find > > > the new > > > > >> > > leader, > > > > >> > > > > if > > > > >> > > > > > > there is one. There is no need for an epoch to handle > > > this. > > > > >> > > > > Similarly, I > > > > >> > > > > > > can't think of a reason why changing the in-sync replica > > > set > > > > >> needs > > > > >> > > to > > > > >> > > > > bump > > > > >> > > > > > > the epoch. > > > > >> > > > > > > > > > > >> > > > > > > best, > > > > >> > > > > > > Colin > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > On Wed, Jan 24, 2018, at 09:45, Dong Lin wrote: > > > > >> > > > > > > > Thanks much for reviewing the KIP! > > > > >> > > > > > > > > > > > >> > > > > > > > Dong > > > > >> > > > > > > > > > > > >> > > > > > > > On Wed, Jan 24, 2018 at 7:10 AM, Guozhang Wang < > > > > >> > > wangg...@gmail.com> > > > > >> > > > > > > wrote: > > > > >> > > > > > > > > > > > >> > > > > > > > > Yeah that makes sense, again I'm just making sure we > > > > >> understand > > > > >> > > > > all the > > > > >> > > > > > > > > scenarios and what to expect. > > > > >> > > > > > > > > > > > > >> > > > > > > > > I agree that if, more generally speaking, say users > > > have > > > > >> only > > > > >> > > > > consumed > > > > >> > > > > > > to > > > > >> > > > > > > > > offset 8, and then call seek(16) to "jump" to a > > > further > > > > >> > > position, > > > > >> > > > > then > > > > >> > > > > > > she > > > > >> > > > > > > > > needs to be aware that OORE maybe thrown and she > > > needs to > > > > >> > > handle > > > > >> > > > > it or > > > > >> > > > > > > rely > > > > >> > > > > > > > > on reset policy which should not surprise her. > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > I'm +1 on the KIP. > > > > >> > > > > > > > > > > > > >> > > > > > > > > Guozhang > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > On Wed, Jan 24, 2018 at 12:31 AM, Dong Lin < > > > > >> > > lindon...@gmail.com> > > > > >> > > > > > > wrote: > > > > >> > > > > > > > > > > > > >> > > > > > > > > > Yes, in general we can not prevent > > > > >> OffsetOutOfRangeException > > > > >> > > if > > > > >> > > > > user > > > > >> > > > > > > > > seeks > > > > >> > > > > > > > > > to a wrong offset. The main goal is to prevent > > > > >> > > > > > > OffsetOutOfRangeException > > > > >> > > > > > > > > if > > > > >> > > > > > > > > > user has done things in the right way, e.g. user > > > should > > > > >> know > > > > >> > > that > > > > >> > > > > > > there > > > > >> > > > > > > > > is > > > > >> > > > > > > > > > message with this offset. > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > For example, if user calls seek(..) right after > > > > >> > > construction, the > > > > >> > > > > > > only > > > > >> > > > > > > > > > reason I can think of is that user stores offset > > > > >> externally. > > > > >> > > In > > > > >> > > > > this > > > > >> > > > > > > > > case, > > > > >> > > > > > > > > > user currently needs to use the offset which is > > > obtained > > > > >> > > using > > > > >> > > > > > > > > position(..) > > > > >> > > > > > > > > > from the last run. With this KIP, user needs to > > get > > > the > > > > >> > > offset > > > > >> > > > > and > > > > >> > > > > > > the > > > > >> > > > > > > > > > offsetEpoch using positionAndOffsetEpoch(...) and > > > stores > > > > >> > > these > > > > >> > > > > > > > > information > > > > >> > > > > > > > > > externally. The next time user starts consumer, > > > he/she > > > > >> needs > > > > >> > > to > > > > >> > > > > call > > > > >> > > > > > > > > > seek(..., offset, offsetEpoch) right after > > > construction. > > > > >> > > Then KIP > > > > >> > > > > > > should > > > > >> > > > > > > > > be > > > > >> > > > > > > > > > able to ensure that we don't throw > > > > >> OffsetOutOfRangeException > > > > >> > > if > > > > >> > > > > > > there is > > > > >> > > > > > > > > no > > > > >> > > > > > > > > > unclean leader election. > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > Does this sound OK? > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > Regards, > > > > >> > > > > > > > > > Dong > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > On Tue, Jan 23, 2018 at 11:44 PM, Guozhang Wang < > > > > >> > > > > wangg...@gmail.com> > > > > >> > > > > > > > > > wrote: > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > "If consumer wants to consume message with > > offset > > > 16, > > > > >> then > > > > >> > > > > consumer > > > > >> > > > > > > > > must > > > > >> > > > > > > > > > > have > > > > >> > > > > > > > > > > already fetched message with offset 15" > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > --> this may not be always true right? What if > > > > >> consumer > > > > >> > > just > > > > >> > > > > call > > > > >> > > > > > > > > > seek(16) > > > > >> > > > > > > > > > > after construction and then poll without > > committed > > > > >> offset > > > > >> > > ever > > > > >> > > > > > > stored > > > > >> > > > > > > > > > > before? Admittedly it is rare but we do not > > > > >> programmably > > > > >> > > > > disallow > > > > >> > > > > > > it. > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > Guozhang > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > On Tue, Jan 23, 2018 at 10:42 PM, Dong Lin < > > > > >> > > > > lindon...@gmail.com> > > > > >> > > > > > > > > wrote: > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > Hey Guozhang, > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > Thanks much for reviewing the KIP! > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > In the scenario you described, let's assume > > that > > > > >> broker > > > > >> > > A has > > > > >> > > > > > > > > messages > > > > >> > > > > > > > > > > with > > > > >> > > > > > > > > > > > offset up to 10, and broker B has messages > > with > > > > >> offset > > > > >> > > up to > > > > >> > > > > 20. > > > > >> > > > > > > If > > > > >> > > > > > > > > > > > consumer wants to consume message with offset > > > 9, it > > > > >> will > > > > >> > > not > > > > >> > > > > > > receive > > > > >> > > > > > > > > > > > OffsetOutOfRangeException > > > > >> > > > > > > > > > > > from broker A. > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > If consumer wants to consume message with > > offset > > > > >> 16, then > > > > >> > > > > > > consumer > > > > >> > > > > > > > > must > > > > >> > > > > > > > > > > > have already fetched message with offset 15, > > > which > > > > >> can > > > > >> > > only > > > > >> > > > > come > > > > >> > > > > > > from > > > > >> > > > > > > > > > > > broker B. Because consumer will fetch from > > > broker B > > > > >> only > > > > >> > > if > > > > >> > > > > > > > > leaderEpoch > > > > >> > > > > > > > > > > >= > > > > >> > > > > > > > > > > > 2, then the current consumer leaderEpoch can > > > not be > > > > >> 1 > > > > >> > > since > > > > >> > > > > this > > > > >> > > > > > > KIP > > > > >> > > > > > > > > > > > prevents leaderEpoch rewind. Thus we will not > > > have > > > > >> > > > > > > > > > > > OffsetOutOfRangeException > > > > >> > > > > > > > > > > > in this case. > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > Does this address your question, or maybe > > there > > > is > > > > >> more > > > > >> > > > > advanced > > > > >> > > > > > > > > > scenario > > > > >> > > > > > > > > > > > that the KIP does not handle? > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > Thanks, > > > > >> > > > > > > > > > > > Dong > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > On Tue, Jan 23, 2018 at 9:43 PM, Guozhang > > Wang < > > > > >> > > > > > > wangg...@gmail.com> > > > > >> > > > > > > > > > > wrote: > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > Thanks Dong, I made a pass over the wiki and > > > it > > > > >> lgtm. > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > Just a quick question: can we completely > > > > >> eliminate the > > > > >> > > > > > > > > > > > > OffsetOutOfRangeException with this > > approach? > > > Say > > > > >> if > > > > >> > > there > > > > >> > > > > is > > > > >> > > > > > > > > > > consecutive > > > > >> > > > > > > > > > > > > leader changes such that the cached > > metadata's > > > > >> > > partition > > > > >> > > > > epoch > > > > >> > > > > > > is > > > > >> > > > > > > > > 1, > > > > >> > > > > > > > > > > and > > > > >> > > > > > > > > > > > > the metadata fetch response returns with > > > > >> partition > > > > >> > > epoch 2 > > > > >> > > > > > > > > pointing > > > > >> > > > > > > > > > to > > > > >> > > > > > > > > > > > > leader broker A, while the actual up-to-date > > > > >> metadata > > > > >> > > has > > > > >> > > > > > > partition > > > > >> > > > > > > > > > > > epoch 3 > > > > >> > > > > > > > > > > > > whose leader is now broker B, the metadata > > > > >> refresh will > > > > >> > > > > still > > > > >> > > > > > > > > succeed > > > > >> > > > > > > > > > > and > > > > >> > > > > > > > > > > > > the follow-up fetch request may still see > > > OORE? > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > Guozhang > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > On Tue, Jan 23, 2018 at 3:47 PM, Dong Lin < > > > > >> > > > > lindon...@gmail.com > > > > >> > > > > > > > > > > > >> > > > > > > > > > wrote: > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > Hi all, > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > I would like to start the voting process > > for > > > > >> KIP-232: > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > https://cwiki.apache.org/ > > > > >> > > confluence/display/KAFKA/KIP- > > > > >> > > > > > > > > > > > > > 232%3A+Detect+outdated+metadat > > > > >> a+using+leaderEpoch+ > > > > >> > > > > > > > > > and+partitionEpoch > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > The KIP will help fix a concurrency issue > > in > > > > >> Kafka > > > > >> > > which > > > > >> > > > > > > > > currently > > > > >> > > > > > > > > > > can > > > > >> > > > > > > > > > > > > > cause message loss or message duplication > > in > > > > >> > > consumer. > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > Regards, > > > > >> > > > > > > > > > > > > > Dong > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > -- > > > > >> > > > > > > > > > > > > -- Guozhang > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > -- > > > > >> > > > > > > > > > > -- Guozhang > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > -- > > > > >> > > > > > > > > -- Guozhang > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > >