[jira] [Created] (KAFKA-16281) Probable IllegalState possible with KIP-966
Jack Vanlightly created KAFKA-16281: --- Summary: Probable IllegalState possible with KIP-966 Key: KAFKA-16281 URL: https://issues.apache.org/jira/browse/KAFKA-16281 Project: Kafka Issue Type: Task Components: kraft Reporter: Jack Vanlightly I have a TLA+ model of KIP-966 and I have identified an IllegalState exception that would occur with the existing MaybeHandleCommonResponse behavior. The issue stems from the fact that a leader, let's call it r1, can resign (either due to a restart or check quorum) and then later initiate a pre-vote where it ends up in the same epoch as before, but a cleared local leader id. When r1 transitions to Prospective it clears its local leader id. When r1 receives a response from r2 who believes that r1 is still the leader, the logic in MaybeHandleCommonResponse tries to transition r1 to follower of itself, causing an IllegalState exception to be raised. This is an example history: # r1 is the leader in epoch 1. # r1 quorum resigns, or restarts and resigns. # r1 experiences an election timeout and transitions to Prospective clearing its local leader id. # r1 sends a pre vote request to its peers. # r2 thinks r1 is still the leader, sends a vote response, not granting its vote and setting leaderId=r1 and epoch=1. # r1 receives the vote response and executes MaybeHandleCommonResponse which tries to transition r1 to Follower of itself and an illegal state occurs. The relevant else if statement in MaybeHandleCommonResponse is here: https://github.com/apache/kafka/blob/a26a1d847f1884a519561e7a4fb4cd13e051c824/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L1538 In the TLA+ specification, I fixed this issue by adding a fourth condition to this statement, that the leaderId also does not equal this server's id. [https://github.com/Vanlightly/kafka-tlaplus/blob/9b2600d1cd5c65930d666b12792d47362b64c015/kraft/kip_996/kraft_kip_996_functions.tla#L336] We should probably create a test to confirm the issue first and then look at using the fix I made in the TLA+, though there may be other options. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] KIP-853: KRaft Controller Membership Changes
Hi Jose, I have a question about how voters and observers, which are far behind the leader, catch-up when there are multiple reconfiguration commands in the log between their position and the end of the log. Here are some example situations that need clarification: Example 1 Imagine a cluster of three voters: r1, r2, r3. Voter r3 goes offline for a while. In the meantime, r1 dies and gets replaced with r4, and r2 dies getting replaced with r5. Now the cluster is formed of r3, r4, r5. When r3 comes back online, it tries to fetch from dead nodes and finally starts unending leader elections - stuck because it doesn't realise it's in a stale configuration whose members are all dead except for itself. Example 2 Imagine a cluster of three voters: r1, r2, r3. Voter r3 goes offline then comes back and discovers the leader is r1. Again, there are many reconfiguration commands between its LEO and the end of the leader's log. It starts fetching, changing configurations as it goes until it reaches a stale configuration (r3, r4, r5) where it is a member but none of its peers are actually alive anymore. It continues to fetch from the r1, but then for some reason the connection to r1 is interrupted. r3 starts leader elections which don't get responses. Example 3 Imagine a cluster of three voters: r1, r2, r3. Over time, many reconfigurations have happened and now the voters are (r4, r5, r6). The observer o1 starts fetching from the nodes in 'controller.quorum.bootstrap.servers' which includes r4. r4 responds with a NotLeader and that r5 is the leader. o1 starts fetching and goes through the motion of switching to each configuration as it learns of it in the log. The connection to r5 gets interrupted while it is in the configuration (r7, r8, r9). It attempts to fetch from these voters but none respond as they are all long dead, as this is a stale configuration. Does the observer fallback to 'controller.quorum.bootstrap.servers' for its list of voters it can fetch from? After thinking it through, it occurs to me that in examples 1 and 2, the leader (of the latest configuration) should be sending BeginQuorumEpoch requests to r3 after a certain timeout? r3 can start elections (based on its stale configuration) which will go nowhere, until it eventually receives a BeginQuorumEpoch from the leader and it will learn of the leader and resume fetching. In the case of an observer, I suppose it must fallback to 'controller.quorum.voters' or 'controller.quorum.bootstrap.servers' to learn of the leader? Thanks Jack On Fri, Jan 26, 2024 at 1:36 AM José Armando García Sancio wrote: > Hi all, > > I have updated the KIP to include information on how KRaft controller > automatic joining will work. > > Thanks, > -- > -José >
Re: How Kafka handle partition leader change?
If you want to understand how the replication protocol works, how it can be configured for consistency, how it can be configured for availability then I have written up a more formal description of the protocol and written TLA+ specifications. These should answer most of your questions and if not, then please do come back and ask further questions. How it works today: - Formal description: https://github.com/Vanlightly/kafka-tlaplus/blob/main/kafka_data_replication/kraft/v3.5/description/0_kafka_replication_protocol.md - TLA+ specification: https://github.com/Vanlightly/kafka-tlaplus/blob/main/kafka_data_replication/kraft/v3.5/kafka_replication_v3_5.tla How it will work when KIP-966 is implemented: - Formal description: https://github.com/Vanlightly/kafka-tlaplus/blob/main/kafka_data_replication/kraft/kip-966/description/0_kafka_replication_protocol.md - TLA+ specification: https://github.com/Vanlightly/kafka-tlaplus/blob/main/kafka_data_replication/kraft/kip-966/kafka_replication_kip_966.tla Hope that helps Jack On Wed, Nov 22, 2023 at 6:27 AM De Gao wrote: > Looks like the core of the problem should still be the juggling game of > consistency, availability and partition tolerance. If we want the cluster > still work when brokers have inconsistent information due to network > partition, we have to choose between consistency and availability. > My proposal is not about fix the message loss. Will share when ready. > Thanks Andrew. > > From: Andrew Grant > Sent: 21 November 2023 12:35 > To: dev@kafka.apache.org > Subject: Re: How Kafka handle partition leader change? > > Hey De Gao, > > Message loss or duplication can actually happen even without a leadership > change for a partition. For example if there are network issues and the > producer never gets the ack from the server, it’ll retry and cause > duplicates. Message loss can usually occur when you use acks=1 config - > mostly you’d lose after a leadership change but in theory if the leader was > restarted, the page cache was lost and it stayed leader again we could lose > the message if it wasn’t replicated soon enough. > > You might be right it’s more likely to occur during leadership change > though - not 100% sure myself on that. > > Point being, the idempotent producer really is the way to write once and > only once as far as I’m aware. > > If you have any suggestions for improvements I’m sure the community would > love to hear them! It’s possible there are ways to make leadership changes > more seamless and at least reduce the probability of duplicates or loss. > Not sure myself. I’ve wondered before if the older leader could reroute > messages for a small period of time until the client knew the new leader > for example. > > Andrew > > Sent from my iPhone > > > On Nov 21, 2023, at 1:42 AM, De Gao wrote: > > > > I am asking this because I want to propose a change to Kafka. But looks > like in certain scenario it is very hard to not loss or duplication > messages. Wonder in what scenario we can accept that and where to draw the > line? > > > > > > From: De Gao > > Sent: 21 November 2023 6:25 > > To: dev@kafka.apache.org > > Subject: Re: How Kafka handle partition leader change? > > > > Thanks Andrew. Sounds like the leadership change from Kafka side is a > 'best effort' to avoid message duplicate or loss. Can we say that message > lost is very likely during leadership change unless producer uses > idempotency? Is this a generic situation that no intent to provide data > integration guarantee upon metadata change? > > > > From: Andrew Grant > > Sent: 20 November 2023 12:26 > > To: dev@kafka.apache.org > > Subject: Re: How Kafka handle partition leader change? > > > > Hey De Gao, > > > > The controller is the one that always elects a new leader. When that > happens that metadata is changed on the controller and once committed it’s > broadcast to all brokers in the cluster. In KRaft this would be via a > PartitonChange record that each broker will fetch from the controller. In > ZK it’d be via an RPC from the controller to the broker. > > > > In either case each broker might get the notification at a different > time. No ordering guarantee among the brokers. But eventually they’ll all > know the new leader which means eventually the Produce will fail with > NotLeader and the client will refresh its metadata and find out the new one. > > > > In between all that leadership movement, there are various ways messages > can get duplicated or lost. However if you use the idempotent producer I > believe you actually won’t see dupes or missing messages so if that’s an > important requirement you could look into that. The producer is designed to > retry in general and when you use the idempotent producer some extra > metadata is sent around to dedupe any messages server-side that were sent > multiple times by the client. > > > > If you’re
Re: [DISCUSS] How to detect (and prevent) complex bugs in Kafka?
I think it is a great technique and I've used local invariants when doing system modelling in Jepsen Maelstrom which has no global view of state for checking global invariants. Sometimes the kind of assertions you want could be too costly for inclusion in a production system so the idea of gating them with a kind of debug mode could be useful. Low-cost assertions should probably be included regardless. I'm not a Kafka code contributor so I can't comment on using this technique to avoid the incorrect usage of threads and locks. However, there is also another concept which could potentially be applied to Kafka as a general coding principle, that of the Poka Yoke [1]. The idea of the Poka Yoke is to avoid mistakes by mistake-proofing, making human error physically much harder. So we have ways of preventing these kinds of mistakes, through some mechanism such as types and ways of quickly detecting these issues once written, in the form of assertions (local invariants). [1] https://en.wikipedia.org/wiki/Poka-yoke Jack On Tue, Oct 24, 2023 at 11:33 AM Divij Vaidya wrote: > Hey folks > > We recently came across a bug [1] which was very hard to detect during > testing and easy to introduce during development. I would like to kick > start a discussion on potential ways which could avoid this category of > bugs in Apache Kafka. > > I think we might want to start working towards a "debug" mode in the broker > which will enable assertions for different invariants in Kafka. Invariants > could be derived from formal verification that Jack [2] and others have > shared with the community earlier AND from tribal knowledge in the > community such as network threads should not perform any storage IO, files > should not fsync in critical product path, metric gauges should not acquire > a lock etc. The release qualification process (system tests + integration > tests) will run the broker in "debug" mode and will validate these > assertions while testing the system in different scenarios. The inspiration > for this idea is derived from Marc Brooker's post at > https://brooker.co.za/blog/2023/07/28/ds-testing.html > > Your thoughts on this topic are welcome! Also, please feel free to take > this idea forward and draft a KIP for a more formal discussion. > > [1] https://issues.apache.org/jira/browse/KAFKA-15653 > [2] https://lists.apache.org/thread/pfrkk0yb394l5qp8h5mv9vwthx15084j > > -- > Divij Vaidya >
Re: [DISCUSS] KIP-932: Queues for Kafka
I would like to see more explicit discussion of topic retention and share groups. There are a few options here from simple to more sophisticated. There are also topic-level and share-group level options. The simple thing would be to ensure that the SPSO of each share group is bounded by the Log Start Offset (LSO) of each partition which itself is managed by the retention policy. This is a topic-level control which applies to all share-groups. I would say that this shared retention is the largest drawback of modeling queues on shared logs and this is worth noting. More sophisticated approaches can be to allow the LSO to advance not (only) by retention policy but by the advancement of the lowest SPSO. This can keep the amount of data lower by garbage collecting messages that have been acknowledged by all share groups. Some people may like that behaviour on those topics where share groups are the only consumption model and no replay is needed. There are per-share-group possibilities such as share-group TTLs where messages can be archived on a per share group basis. Thanks Jack
Re: Complete Kafka replication protocol description
Hi all, I agree that we should have the protocol description and specification in the Kafka repository. I have two other specifications to contribute including the KRaft implementation of Raft and the next gen consumer group protocol (WIP). I also have a formal prose description of the Kafka replication protocol for how it works today. That should appear under the 3.5 directory sometime this week. Regarding moving this kind of stuff into the Kafka repo, there are a number of additional topics such as: - Where to place formal prose descriptions and specifications of protocols in the Kafka repo. - Given we may add more specifications and potentially other formal prose descriptions, how to structure things? - Do we use markdown like I have done here with svg files (created in Excalidraw with the Excalidraw bits embedded in the svg)., or do we use LaTex? Some people may wish to read this directly on GitHub and perhaps others like a PDF. Markdown is more accessible and perhaps more likely to fulfil the living document approach. Thanks Jack On Mon, Sep 11, 2023 at 1:45 PM Divij Vaidya wrote: > This is very useful Jack! > > 1. We are missing a replication protocol specification in our project. > Ideally it should be a living document and adding what you wrote to > the docs/design would be a great start towards that goal. > 2. I have also been building on top of some existing TLA+ to add > changes to replication protocol brought by features such as Tiered > Storage, local retention etc. at > > https://github.com/divijvaidya/kafka-specification/blob/master/KafkaReplication.tla > 3. Apart from verifying correctness, I believe a TLA+ model will also > help developers quickly iterate through their fundamental assumptions > while making changes. As an example, we recently had a discussion in > the community on whether to increase leader epoch with shrink/expand > ISR or not. There was another discussion on whether we can choose the > replica with the largest end offset as the new leader to reduce > truncation. In both these cases, we could have quickly modified the > existing TLA+ model, run through it and verify that the assumptions > still hold true. It would be great if we can take such discussions as > an example and demonstrate how TLA+ could have benefitted the > community. It would help make the case for adding the TLA+ spec as > part of the community owned project itself. > > Right now, things are a bit busy on my end, but I am looking forward > to exploring what you shared above in the coming weeks (perhaps a > first review by end of sept). > > Thank you again for starting this conversation. > > -- > Divij Vaidya > > On Mon, Sep 11, 2023 at 4:49 AM Haruki Okada wrote: > > > > Hi Jack, > > > > Thank you for the great work, not only the spec but also for the > > comprehensive documentation about the replication. > > Actually I wrote some TLA+ spec to verify unclean leader election > behavior > > before so I will double-check my understanding with your complete spec :) > > > > > > Thanks, > > > > 2023年9月10日(日) 21:42 David Jacot : > > > > > Hi Jack, > > > > > > This is great! Thanks for doing it. I will look into it when I have a > bit > > > of time, likely after Current. > > > > > > Would you be interested in contributing it to the main repository? That > > > would be a great contribution to the project. Having it there would > allow > > > the community to maintain it while changes to the protocol are made. > That > > > would also pave the way for having other specs in the future (e.g. new > > > consumer group protocol). > > > > > > Best, > > > David > > > > > > Le dim. 10 sept. 2023 à 12:45, Jack Vanlightly > a > > > écrit : > > > > > > > Hi all, > > > > > > > > As part of my work on formally verifying different parts of Apache > Kafka > > > > and working on KIP-966 I have built up a lot of knowledge about how > the > > > > replication protocol works. Currently it is mostly documented across > > > > various KIPs and in the code itself. I have written a complete > protocol > > > > description (with KIP-966 changes applied) which is inspired by the > > > precise > > > > but accessible style and language of the Raft paper. The idea is > that it > > > > makes it easier for contributors and anyone else interested in the > > > protocol > > > > to learn how it works, the fundamental properties it has and how > those > > > > properties are supported by the various behaviors and conditions. > > > > > > > > It currently resides next to the TLA+ specification itself in my > > > > kafka-tlaplus repository. I'd be interested to receive feedback from > the > > > > community. > > > > > > > > > > > > > > > > https://github.com/Vanlightly/kafka-tlaplus/blob/main/kafka_data_replication/kraft/kip-966/description/0_kafka_replication_protocol.md > > > > > > > > Thanks > > > > Jack > > > > > > > > > > > > > -- > > > > Okada Haruki > > ocadar...@gmail.com > > >
Complete Kafka replication protocol description
Hi all, As part of my work on formally verifying different parts of Apache Kafka and working on KIP-966 I have built up a lot of knowledge about how the replication protocol works. Currently it is mostly documented across various KIPs and in the code itself. I have written a complete protocol description (with KIP-966 changes applied) which is inspired by the precise but accessible style and language of the Raft paper. The idea is that it makes it easier for contributors and anyone else interested in the protocol to learn how it works, the fundamental properties it has and how those properties are supported by the various behaviors and conditions. It currently resides next to the TLA+ specification itself in my kafka-tlaplus repository. I'd be interested to receive feedback from the community. https://github.com/Vanlightly/kafka-tlaplus/blob/main/kafka_data_replication/kraft/kip-966/description/0_kafka_replication_protocol.md Thanks Jack
Re: [DISCUSS] KIP-966: Eligible Leader Replicas
Hi Calvin, Regarding partition reassignment, I have two comments. I notice the KIP says "The AlterPartitionReassignments will not change the ELR" however, when a reassignment completes (or reverts) any replicas removed from the replica set would be removed from the ELR. Sounds obvious but I figured we should be explicit about that. Reassignment should also respect min.insync.replicas because currently a reassignment can complete as long as the ISR is not empty and all added replicas are members. However, my TLA+ specification, which now includes reassignment, finds single broker failures that can cause committed data loss - despite the added protection of the ELR and min.insync.replicas=2. These scenarios are limited to shrinking the size of the replica set. If we modify the PartitionChangeBuilder to add the completion condition that the target ISR >= min.insync.replicas, then that closes this last single-broker-failure data loss case. With the above modification, the TLA+ specification of the ELR part of the design is standing up to all safety and liveness checks. The only thing that is not modeled is the unclean recovery though I may leave that as the specification is already very large. Jack On 2023/09/01 22:27:10 Calvin Liu wrote: > Hi Justine > 1. With the new proposal, in order to let the consumer consume a message > when only 1 replica commits it to its log, the min ISR has to be set to 1. > 2. Yes, we will need an unclean recovery if the leader has an unclean > shutdown. > 3. If the min ISR config is changed to a larger value, the ISR and ELR will > not be updated. ISR members are always valid no matter how min ISR changes. > If ELR is not empty, then the HWM can't advance as well after the min ISR > increase, so the ELR members are safe to stay. > 4. I will highlight the explanation. Thanks. > > On Thu, Aug 31, 2023 at 4:35 PM Justine Olshan > wrote: > > > Hey Calvin, > > > > Thanks for the responses. I think I understood most of it, but had a few > > follow up questions > > > > 1. For the acks=1 case, I was wondering if there is any way to continue > > with the current behavior (ie -- we only need one ack to produce to the log > > and consider the request complete.) My understanding is that we can also > > consume from such topics at that point. > > If users wanted this lower durability could they set min.insync.replicas to > > 1? > > > > 2. For the case where we elect a leader that was unknowingly offline. Say > > this replica was the only one in ELR. My understanding is that we would > > promote it to ISR and remove it from ELR when it is the leader, but then we > > would remove it from ISR and have no brokers in ISR or ELR. From there we > > would need to do unclean recovery right? > > > > 3. Did we address the case where dynamically min isr is increased? > > > > 4. I think my comment was more about confusion on the KIP. It was not clear > > to me that the section was describing points if one was done before the > > other. But I now see the sentence explaining that. I think I skipped from > > "delivery plan" to the bullet points. > > > > Justine > > > > On Thu, Aug 31, 2023 at 4:04 PM Calvin Liu > > wrote: > > > > > Hi Justine > > > Thanks for the questions! > > > *a. For my understanding, will we block replication? Or just the high > > > watermark advancement?* > > > - The replication will not be blocked. The followers are free to > > > replicate messages above the HWM. Only HWM advancement is blocked. > > > > > > b. *Also in the acks=1 case, if folks want to continue the previous > > > behavior, they also need to set min.insync.replicas to 1, correct?* > > > - If the clients only send ack=1 messages and minISR=2. The HWM > > behavior > > > will only be different when there is 1 replica in the ISR. In this case, > > > the min ISR does not do much in the current system. It is kind of a > > > trade-off but we think it is ok. > > > > > > c. *The KIP seems to suggest that we remove from ELR when we start up > > > again and notice we do not have the clean shutdown file. Is there a > > chance > > > we have an offline broker in ELR that had an unclean shutdown that we > > elect > > > as a leader before we get the change to realize the shutdown was > > unclean?* > > > * - *The controller will only elect an unfenced(online) replica as the > > > leader. If a broker has an unclean shutdown, it should register to the > > > controller first(where it has to declair whether it is a clean/unclean > > > shutdown) and then start to serve broker requests. So > > > 1. If the broker has an unclean shutdown before the controller is > > > aware that the replica is offline, then the broker can become the leader > > > temporarily. But it can't serve any Fetch requests before it registers > > > again, and that's when the controller will re-elect a leader. > > > 2. If the controller knows the replica is offline(missing heartbeats > > > from the broker for a while) before
Re: [DISCUSS] KIP-853: KRaft Voters Change
Hi Jose, It's looking good! > I think that when a replica has caught up is an implementation detail > and we can have this detailed discussion in Jira or the PR. What do > you think? Yes, that sounds fine. For what it's worth, making the leader take the decision of when an observer is caught-up or not greatly simplifies it as the leader has all the information necessary. Thanks Jack
Re: [DISCUSS]: Including TLA+ in the repo
+1 for me too. Once the KIP-853 is agreed I will make any necessary changes and submit a PR to the apache/kafka repo. Jack On Tue, Jul 26, 2022 at 10:10 PM Ismael Juma wrote: > I'm +1 for inclusion in the main repo and I was going to suggest the same > in the KIP-853 discussion. The original authors of 3 and 4 are part of the > kafka community, so we can ask them to submit PRs. > > Ismael > > On Tue, Jul 26, 2022 at 7:58 AM Tom Bentley wrote: > > > Hi, > > > > I noticed that TLA+ has featured in the Test Plans of a couple of recent > > KIPs [1,2]. This is a good thing in my opinion. I'm aware that TLA+ has > > been used in the past to prove properties of various parts of the Kafka > > protocol [3,4]. > > > > The point I wanted to raise is that I think it would be beneficial to the > > community if these models could be part of the main Kafka repo. That way > > there are fewer hurdles to their discoverability and it makes it easier > for > > people to compare the implementation with the spec. Spreading familiarity > > with TLA+ within the community is also a potential side-benefit. > > > > I notice that the specs in [4] are MIT-licensed, but according to the > > Apache 3rd party license policy [5] it should be OK to include. > > > > Thoughts? > > > > Kind regards, > > > > Tom > > > > [1]: > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-TestPlan > > [2]: > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Voter+Changes#KIP853:KRaftVoterChanges-TestPlan > > [3]: https://github.com/hachikuji/kafka-specification > > [4]: > > > > > https://github.com/Vanlightly/raft-tlaplus/tree/main/specifications/pull-raft > > [5]: https://www.apache.org/legal/resolved.html > > >
Re: [DISCUSS] KIP-853: KRaft Voters Change
Hi Jose, Thanks for getting this started. Some comments: - Regarding the removal of voters, when a leader appends a RemoveVoterRecord to its log, it immediately switches to the new configuration. There are two cases here: 1. The voter being removed is the leader itself. The KIP documents that the followers will continue to fetch from the leader despite these followers learning that the leader has been removed from the voter set. This is correct. It might be worth stating that the reason is that else the cluster can get blocked from making further progress as the followers must keep fetching from the leader else the leader cannot commit the record and resign. 2. The voter being removed is not the leader. We should document that the leader will accept fetches with replicas ids (i.e. not observer fetches) from followers who are not in the voter set of the leader. This will occur because the voter being removed won't have received the RemoveVoteRecord yet and it must be allowed to reach this record so that: a) the RemoveVoterRecord can be committed (this voter may actually be required in order to reach quorum on this record) and b) the voter being removed can switch out of voter mode. - Regarding what happens when a voter leaves the voter set. When a non-leader voter reaches a RemoveVoterRecord where it is the subject of the removal, does it switch to being an observer and continue fetching? When a leader is removed, it carries on until it has committed the record or an election occurs electing a different leader. Until that point, it is leader but not a voter, so that makes it an observer? After it has committed the record and resigns (or gets deposed by an election) does it then start fetching as an observer? - I think the Add/RemoveVoterRecords should also include the current voter set. This will make it easier to code against and also make troubleshooting easier. Else voter membership can only be reliably calculated by replaying the whole log. - Regarding the adding of voters: 1. We should document the problem of adding a new voter which then causes all progress to be blocked until the voter catches up with the leader. For example, in a 3 node cluster, we lose 1 node. We add a new node which means we have a majority = 3, with only 3 functioning nodes. Until the new node has caught up, the high watermark cannot advance. This can be solved by ensuring that to add a node we start it first as an observer and once it has caught up, send the AddVoter RPC to the leader. This leads to the question of how an observer determines that it has caught up. 2. Perhaps an administrator should send a Join RPC to the node we want to add. It will, if it isn't already, start fetching as an observer and automatically do the AddVoter RPC to the leader itself. This way from a human operations point-of-view, there is only a single action to perform. - For any of the community that understand or are interested in TLA+, should we include the provisional TLA+ specification for this work? https://github.com/Vanlightly/raft-tlaplus/blob/main/specifications/pull-raft/KRaftWithReconfig.tla Thanks Jack On Thu, Jul 21, 2022 at 7:38 PM Justine Olshan wrote: > Hey Jose -- this seems like an important KIP! And I enjoy seeing more Uuid > usage :) > I was curious a bit more about the motivation here. That section seems to > be missing. > > Thanks for sharing the KIP! > Justine > > On Thu, Jul 21, 2022 at 10:30 AM Niket Goel > wrote: > > > Hey Jose, > > > > Thanks for the KIP. This is a good improvement and will make the KRaft > > implementation much more robust in the face of failures and generally > make > > it more flexible for users. > > > > I did a first pass through the KIP and here are some comments (some of > > these might just be a little uninformed, so feel free to direct me to > > supplemental reading): > > Overall protocol safety wise, the reconfiguration operations look safe. > > > > > This UUID will be generated once and persisted as part of the quorum > > state for the topic partition > > Do we mean that it will be generated every time the disk on the replica > is > > primed (so every disk incarnation will have UUID). I think you describe > it > > in a section further below. Best to pull it up to here — “the replica > uuid > > is automatically generated once by the replica when persisting the quorum > > state for the first time.” > > > > > If there are any pending voter change operations the leader will wait > > for them to finish. > > Will new requests be rejected or queued up behind the pending operation. > > (I am assuming rejected, but want to confirm) > > > > > When this option is used the leader of the KRaft topic partition will > > not allow the AddVoter RPC to add replica IDs that are not describe in > the > > configuration and it would not allow the RemoveVoter RPC to remove > replica > > IDs that are described in the configuration. > > Bootstrapping is a little tricky I think. Is it safer/simpler
Re: [DISCUSS] KIP-782: Expandable batch size in producer
The current configs are hard to use for the Kafka user and a little inflexible so I am pleased to see the discussion. Ultimately we want flexibility. We don't want to force users to understand the underlying implementation/protocol and we want the producer to handle high or low throughput efficiently - with the right amount of inflight data for the conditions. Given each producer by default has limited inflight requests per connection then we need extra flexibility in the batch sizing. The way I would like to be able to approach as a user is that 1) I use linger.ms to bound my client-side produce latency 2) I use request.size and max.inflight.requests.per.connection to calculate my upper-bound inflight data. As a user, I don't have to think about anything else (no edge cases or caveats). The producer handles the batching intelligently, trying to fully utilise the request sizes while respecting the linger. I understand Jun's point about increasing the default batch size. The only downside I see is having to juggle the request vs batch size to avoid delaying batches because we can only fit so many big batches in a request. That is a calculation we force on the user and reduces flexibility. As I understand the proposed change, each ProducerBatch would be physically made up of a list of ByteBuffers that allows for dynamically increasing the batch size in increments of batch.initial.size. The documented benefit is that we can send large batches when there is lots of data ready to be sent, while also not having to wait for linger.ms when there is less data. Comparing this to Jun's suggested approach, it looks the same except that we get size triggered batches instead of linger when throughput is low. How big an advantage is that? An alternative dynamic sizing strategy is one based on fairness. Batches are dynamically sized in a fair way that more fairly distributes the data of each partition across the request. Messages are added to small sub-batches (say 4kb or 16kb) in the accumulator. When draining the sub-batches in the Sender, the logic selects sub-batches by creation order repeatedly until constraints are met (like request.size). All sub-batches of the same partition are grouped into a single batch for sending. This way the user can set a high batch.size (could even default it to request size) as the final batches will be fairly distributed across partitions and we should get good utilisation of the request.size (and still respecting linger). Example with request.size=128kb, 16kb sub-batches, 1 broker and picking sub-batches in creation order. None have reached linger in this example for simplicity. Accumulator p0: [sb1{t1},sb2{t2},sb3{t3},sb4{t4},sb5{t5},sb6{t6},sb7{t7},sb8{t8}] p1: [sb1{t1}] p2: [sb1{t3}] p3: [sb1{t7}] Request 1 p0: [sb1-sb5] p1: [sb1] p2: [sb1] Request 2 (with no additional data having had arrived) p0: [sb6-sb8] p3: [sb1] The downsides are a decent refactoring. There would need to be changes to the housekeeping of how callbacks are managed for example. There might be more impact that I am not aware of, I'm fairly new to this code. Thanks Jack On 2021/12/13 19:15:37 Jun Rao wrote: > Hi, Lucas, > > Thanks for the reply. It would be useful to summarize the benefits of a > separate batch.max.size. To me, it's not clear why a user would want two > different batch sizes. In your example, I can understand why a user would > want to form a batch with a 5ms linger. But why would a user prefer 16K > batches with 5ms linger, if say 256K is deemed best for throughput? > > Thanks, > > Jun > > On Fri, Dec 10, 2021 at 4:35 PM Lucas Bradstreet > wrote: > > > Hi Jun, > > > > One difference compared to increasing the default batch size is that users > > may actually prefer smaller batches but it makes much less sense to > > accumulate many small batches if a batch is already sending. > > > > For example, imagine a user that prefer 16K batches with 5ms linger. > > Everything is functioning normally and 16KB batches are being sent. Then > > there's a 500ms blip for that broker. Do we want to continue to accumulate > > 16KB batches, each of which requires a round trip, or would we prefer to > > accumulate larger batches while sending is blocked? > > > > I'm not hugely against increasing the default batch.size in general, but > > batch.max.size does seem to have some nice properties. > > > > Thanks, > > > > Lucas > > > > On Fri, Dec 10, 2021 at 9:42 AM Jun Rao wrote: > > > > > Hi, Artem, Luke, > > > > > > Thanks for the reply. > > > > > > 11. If we get rid of batch.max.size and increase the default batch.size, > > > it's true the behavior is slightly different than before. However, does > > > that difference matter to most of our users? In your example, if a user > > > sets linger.ms to 100ms and thinks 256KB is good for throughput, does it > > > matter to deliver any batch smaller than 256KB before 100ms? I also find > > it > > > a bit hard to explain