Hi Jun,

Thanks for the feedback. Excuse the delay, it took me a while to
properly address your detailed feedback. See my comments below.

I am going to update the KIP as outlined in this email. I will send
another email when I have made all of the changes.

On Fri, Feb 2, 2024 at 10:54 AM Jun Rao <j...@confluent.io.invalid> wrote:
> 10. kraft.version: Functionality wise, this seems very similar to
> metadata.version, which is to make sure that all brokers/controllers are on
> a supported version before enabling a new feature. Could you explain why we
> need a new one instead of just relying on metadata.version?

Yes, they are trying to solve similar problems but they are doing them
at a different abstraction layer. "metadata.version" uses
FeatureLevelRecord which is a "data" record. I am trying to design the
KRaft protocol (KafkaRaftClient implementation) to not assume a
specific application (metadata and the controller). To support
multiple applications possibly using KRaft means that we need to use
control records to define KRaft's behavior. Instead of using
FeatureLevelRecord which is a cluster metadata record, this KIP uses
KRaftVersionRecord which will be a control record.

The reason why we need to consider observers (brokers) when
determining the kraft.version is because of snapshot (compaction).
Since snapshots need to include all logical records in the log
segments and observers (brokers) maintain the log on disk, this means
that they need to be able to decode and encode both KRaftVersionRecord
and VotersRecord before the leader is allowed to write them.

> 11. Both the quorum-state file and controller.quorum.bootstrap.servers
> contain endpoints. Which one takes precedence?

I updated the KIP (section "Reference explanation / Endpoints
information") after Jack's email. KRaft is going to keep two sets of
endpoints.

1. Voters set. This set of endpoints is used by voters to establish
leadership with the Vote, BeginQuorumEpoch and EndQuorumEpoch RPCs.
The precedence order is the VotersRecord in the snapshot and log. If
there are no VotersRecords in the snapshot or log, it will use the
configuration in controller.quorum.voters.

2. Bootstrap servers. This is mainly used by observers (brokers) to
discover the leader through Fetch RPCs. The precedence order is
controller.quorum.bootstrap.servers first, controller.quorum.voters
second. Voters won't use this property as they discover the leader and
its endpoint from the BeginQuorumEpoch RPC from the leader.

I believe that the original intent of the voters in quorum state file
was as a cache of the controller.quorum.voters configuration and to
identify invalid changes in the configuration. I was trying to keep
this functionality in the new version of the quorum state file.

The more I think about the implementation, I don't think this is
useful or even implementable. KRaft needs to keep all voters sets from
the latest snapshot to the LEO so that it could include the correct
voters set when generating a snapshot. I am going to update the KIP to
remove voter information from the quorum state file.

> 12. It seems that downgrading from this KIP is not supported? Could we have
> a section to make it explicit?

Yes. Downgrades will not be supported. I updated the "Compatibility,
deprecation and migration plan". There is a sentence about this in the
"Public interfaces / Command line interface / kafka-features" section.
I'll also update the "Proposed changes / Reference explanation /
Supported features" and "Public interfaces / RPCs / UpdateFeatures /
Handling" sections.

> 13. controller.quorum.auto.join.enable: If this is set true, when does the
> controller issue the addVoter RPC? Does it need to wait until it's caught
> up? Does it issue the addVoter RPC on every restart?

The controller will monitor the voters set. If the controller finds
that the voters set doesn't contain itself, it will send an AddVoter
RPC to the leader. To avoid the quorum adding a node and becoming
unavailable, I will change the handling of the AddVoter RPC to not
allow duplicate replica id. If there is any replica with the same id
but a different uuid, the old uuid must be removed first (with the
RemoveVoter RPC) before the new uuid can be added.

Here is an example that shows how a KRaft partition can become
unavailable if we allow automatically adding duplicate replica ids.
Assume that the partition starts with voters v1 and v2. Operator
starts controller 3 as v3 (replica id 3, some generated replica uuid)
and it attempts to automatically join the cluster by sending the
AddVoter RPC. Assume that the voter set (v1, v2, v3) gets committed by
v1 and v2. Controller 3 restarts with a new disk as v3' and sends an
AddVoter RPC. Because (v1, v2, v3) was committed the leader is able to
change the quorum to (v1, v2, v3, v3') but won't be able to commit it
if the controller 3 restarts again and comes back with a new disk as
v3''.

This case is avoided if the KRaft leader rejects any AddVoter RPC that
duplicates the replica id.

> 14. "using the AddVoter RPC, the Admin client or the kafka-metadata-quorum
> CLI.": In general, the operator doesn't know how to make RPC calls. So the
> practical options are either CLI or adminClient.

Sure. I can remove the explicit mention of using AddVoter RPC. I
mentioned it because 3rd party libraries for Kafka could also
implement the client side of the AddVoter RPC.

> 15. VotersRecord: Why does it need to include name and SecurityProtocol in
> EndPoints? It's meant to replace controller.quorum.voters, which only
> includes host/port.

I can remove the security protocol but we should keep the name. This
would allow the local replica to look up the security protocol in the
security protocol map to negotiate the connection to the remote node.

The current implementation always assumes that all of the endpoints in
controller.quorum.voters expect the security protocol of the first
controller listener. This is difficult to satisfy if the operator
tries to change the security protocol of the controllers and remain
available.

Assume the following remote voter:
[ {name: CONTROLLER_SSL, host: controller-0, port: 9000}, {name:
CONTROLLER, host: controller-0, port: 9001} ]

Also assume that the local controller has the configuration for the
CONTROLLER listener and not the CONTROLLER_SSL listener because the
local node has yet to be configured to use SSL. In this case we want
the local replica to be able to connect to the remote replica using
CONTROLLER instead of CONTROLLER_SSL.

> 16. "The KRaft leader cannot do this for observers (brokers) since their
> supported versions are not directly known by the KRaft leader."
> Hmm, the KRaft leader receives BrokerRegistrationRequest that includes
> supported feature versions, right?

Yeah. I should define my terms better. I'll update the glossary
section. When I mention KRaft, (KRaft leader, voter, etc), I am
referring to the KRaft protocol and the KafkaRaftClient implementation
and not the controller state machine that is implemented using KRaft.

As you point out the controller does know about all of the registered
brokers through BrokerRegistrationRequest. The following sentence
mentions that the controller state machine will make this information
available to the KRaft implementation (KafkaRaftClient): "The
controller state machine will instead push the brokers' kraft.version
information to the KRaft client." The controller will push this
information down to the RaftClient because the KRaft implementation
doesn't know how to decode the application's (cluster metadata)
RegisterBrokerRecord.

> 17. UpdateVoter:
> 17.1 "The leader will learn the range of supported version from the
> UpdateVoter RPC".
> KIP-919 introduced ControllerRegistrationRequest to do that. Do we need a
> new one?

They are similar but there are 3 differences as to why I decided to
add this new RPC.
1. They are implemented at two different layers of the protocol. The
Kafka controller is an application of the KRaft protocol. I wanted to
keep this distinction in this design. The controller API is going to
forward ControllerRegistrationRequest to the QuorumController and it
is going to forward UpdateVoter to the KafkaRaftClient.
2. The semantics are different. ControllerRegistrationRequest is an
update and an insert (upsert) operation while UpdateVoter is only an
update operation. If the voter getting updated is not part of the
voter set the leader will reject the update.
3. The other semantic difference is that in KRaft, the voter set
(which includes the replica ids and the endpoint) is based on
uncommitted data. While the controller state machine only sees
committed data.

> 17.2 Do we support changing the voter's endpoint dynamically? If not, it
> seems that can be part of ControllerRegistrationRequest too.

KRaft should allow for the endpoints of a remote replica to change
without having to restart the local replica. It is okay to require a
restart to change the endpoints of the local replica.

> 18. AddVoter
> 18.1 "This RPC can be sent to a broker or controller, when sent to a
> broker, the broker will forward the request to the active controller."
> If it's sent to a non-active controller, it will also be forwarded to the
> active controller, right?

I guess it could but I wasn't planning to implement that. We don't
have a forwarding manager from controller to active controller. We
only have that from brokers to controllers.

When the Admin client is connected to the controllers the admin client
will connect and send requests directly to the active controller. This
is implemented in the LeastLoadedBrokerOrActiveKController node
provider in the admin client.

> 18.2 Why do we need the name/security protocol fields in the request?
> Currently, they can be derived from the configs.
>     { "name": "Listeners", "type": "[]Listener", "versions": "0+",
>       "about": "The endpoints that can be used to communicate with the
> voter", "fields": [
>       { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
>         "about": "The name of the endpoint" },
>       { "name": "Host", "type": "string", "versions": "0+",
>         "about": "The hostname" },
>       { "name": "Port", "type": "uint16", "versions": "0+",
>         "about": "The port" },
>       { "name": "SecurityProtocol", "type": "int16", "versions": "0+",
>         "about": "The security protocol" }
>     ]}

This is the listener name of the remote replica. The local replica
needs to know how to connect to the remote replica that includes host,
port and security protocol. The local replica can lookup the
connection mechanism by using the listener name as long as the local
replica also has a configuration for that listener name.

I'll remove the security protocol from the Listener struct. We can add
it later if we find it helpful for debug purposes.

> 18.3 "4. Send an ApiVersions RPC to the first listener to discover the
> supported kraft.version of the new voter."
> Hmm, I thought that we found using ApiVersions unreliable (
> https://issues.apache.org/jira/browse/KAFKA-15230) and therefore
> introduced ControllerRegistrationRequest to propagate this information.
> ControllerRegistrationRequest can be made at step 1 during catchup.

The context of that Jira is around the UpdateFeature request. When
upgrading a feature the controller was checking the ApiVersions
responses in the NetworkClient. This is incorrect because those
entries and connections can timeout. The other issue is that they
wanted UpdateFeature to work even if the controller were offline,
similar to UpdateFeature working if the brokers are offline.

I don't have this concern with AddVoter since it is already expected
that the kraft voter (controller) is online and fetching from the
leader when the user attempts to add the voter.

> 18.4 "In 4., the new replica will be part of the quorum so the leader will
> start sending BeginQuorumEpoch requests to this replica."
> Hmm, the leader should have sent BeginQuorumEpoch at step 1 so that the new
> replica can catch up from it, right? Aslo, step 4 above only mentions
> ApiVersions RPC, not BeginQuorumEpoch.

Apologies, that should read: "In 7., ..." I think I updated the
bulleted outline and forgot to update the section that follows.

Before the VotersRecord gets appended to the log the new replica
should be an observer. Observers discover the leader sending Fetch
requests to the bootstrap servers (either
controller.quorum.bootstrap.servers or controller.quorum.voters).

This is part of the current behavior and is needed so that the new
voters (currently observers) can catch up to the leader before getting
added to the voters set.

The leader only sends BeginQuorumEpoch to voters because those are the
only replicas and endpoints that the leader knows about and knows how
to connect to.

> 19. Vote: It's kind of weird that VoterUuid is at the partition level.
> VoteId and VoterUuid uniquely identify a node, right? So it seems that it
> should be at the same level as VoterId?

Yeah, it is. The issue is that one node (replica) can have multiple
log directories (log.dirs property) and a different metadata log dir
(metadata.log.dir). So it is possible for one voter id (node.id) to
have different voter uuid for different partitions.

In practice this won't happen because cluster metadata is the only
KRaft partition but it can happen in the future.

> 20. EndQuorumEpoch: "PreferredSuccessor which is an array is replica ids,
> will be replaced by PreferredCandidates"
> 20.1 But PreferredSuccessor is still listed in the response.

It shouldn't be in the response. I don't see it in
EndQuorumEpochRespose. Did you mean that I used "preferred successor"
instead of "preferred candidates" in the "about" section of the JSON
schema? If so, I'll fix that.

> 20.2 Why does the response need to include endPoint?

In KRaft most RPC responses return the leader id and leader epoch if
the epoch in the request is smaller than the epoch of the receiving
replica. KRaft does this to propagate the latest leader and epoch as
fast as possible.

To all of these responses I have added endpoint information now that
the voter set and endpoints are dynamic.

> 21. "The KRaft implementation and protocol describe in KIP-595 and KIP-630
> never read from the log or snapshot."
> This seems confusing. The KIP-595 protocol does read the log to replicate
> it, right?

You are right. This is not accurate. KRaft reads from the log and
snapshot. I meant to say that the KRaft protocol described in those
KIPs is not dependent on the content of the control record in the
snapshot or log. That is changing with this KIP.

> 22. "create a snapshot at 00000000000000000000-0000000000.checkpoint with
> the necessary control records (KRaftVersionRecord and VotersRecord) to make
> this Kafka node the only voter for the quorum."
> 22.1 It seems that currently, the bootstrap checkpoint is
> named bootstrap.checkpoint. Are we changing it to
> 00000000000000000000-0000000000.checkpoint?

bootstrap.checkpoint is controller feature and not integrated into the
KRaft layer. For example, that file is read directly by the controller
without interacting with KRaft. For this feature we need a checkpoint
that integrates with the KRaft checkpoint mechanisms.

Do you mind if I resolve this inconsistency in a future KIP? It should
be possible to get rid of the bootstrap.checkpoint and fix the
controller implementation to always let the RaftClient manage the
checkpoint's lifecycle.

> 22.2 Just to be clear, records in the bootstrap snapshot will be propagated
> to QuorumStateData, right?

This is true in the current KIP. As I mentioned earlier in this email,
I am going to remove voter information from QuorumStateData. It is not
useful since KRaft needs to support the case when log truncation
includes VotersRecord control records. This means that the local
replica will always discover the voters set by reading and decoding
the local snapshot and log segments.

> 23. kafka-metadata-quorum: In the output, we include the endpoints for the
> voters, but not the observers. Why the inconsistency?

Yes. This is intentional. At the KRaft layer the leader doesn't know
the endpoints for the observers. The leader doesn't need to know those
endpoints because in the KRaft protocol the leader never sends
requests to observers. The KRaft leader will only print the endpoints
contained in the VotersRecord control records in the local log.

> 24. "The tool will set the TimoutMs for the AddVoterRequest to 30 seconds."
>   Earlier, we have "The TimoutMs for the AddVoter RPC will be set to 10
> minutes". Should we make them consistent?

Yeah. They are different clients, the admin client and
"kafka-metadata-quorum add-controller" will use 30 seconds. The
controller's network client was going to use 10 minutes. Now that I
think about it is better to keep the timeout lower and consistent. The
controller needs to implement request retrying anyways. I'll change
the KIP to use 30 seconds for both cases.

> 25. "The replicas will update these order list of voters set whenever the
> latest snapshot id increases, a VotersRecord control record is read from
> the log and the log is truncated."
>   Why will the voters set change when a new snapshot is created?

The implementation needs to keep a list of voter sets (e.g.
List[(Offset: Int, Voters: Set[Voter]]). It needs to keep this list of
voters sets for any VotersRecord between the latest snapshot and the
LEO. It needs to do this for two reasons:

1. The replica will need to generate a snapshot at some offset X
between the latest snapshot and the HWM. The replica needs to know the
latest voters set (VotersRecord) at that offset.
2. The replica may be asked (through the DivergingEpoch in the Fetch
response) to truncate its LEO. It is more efficient to just remove
elements from the list of voters set than to re-read the latest
snapshot and the following log segments.

> 26. VotersRecord includes MinSupportedVersion and MaxSupportedVersion. In
> FeatureLevelRecord, we only have one finalized version. Should we be
> consistent?

The VotersRecord control record in KRaft is similar to the
RegisterBrokerRecord in metadata (controller). VotersRecord describes
all of the voters, their endpoints and supported kraft protocol
versions (e.g. 0 to 1).
The KRaftVersionRecord control record in KRaft is similar to the
FeatureLevelRecord in metadata (controller). It specifies the
currently active or finalized kraft.version.

> 27. The Raft dissertation mentioned the issue of disruptive servers after
> they are removed. "Once the cluster leader has created the Cnew entry, a
> server that is not in Cnew will no longer receive heartbeats, so it will
> time out and start new elections."
>   Have we addressed this issue?

To summarize, Diego suggests that Vote requests should be denied if
the receiving voter is a follower with an active leader. He also
suggests ignoring the epoch in the request even if it is a greater
epoch than the epoch of the local replica.

My plan is to rely on KIP-996: Pre-vote:
"When servers receive VoteRequests with the PreVote field set to true,
they will respond with VoteGranted set to
  * true if they are not a Follower and the epoch and offsets in the
Pre-Vote request satisfy the same requirements as a standard vote
  * false if otherwise"

Similar to the pre-vote KIP, I don't think we should implement Diego's
suggestion of also ignoring the higher epoch for "standard" Vote
request:
"if a server receives a RequestVote request within the minimum
election timeout of hearing from a current leader, it does not update
its term or grant its vote"

I don't think we should do this because otherwise the replica that
sent the Vote request may never be able to participate in the KRaft
partition (even as an observer) because its on-disk epoch is greater
than the epoch of the rest of the replicas. In KRaft (and Raft) we
have an invariant where epochs are monotonically increasing.

> 28. There are quite a few typos in the KIP.
> 28.1 "the voters are the replica ID and UUID is in its own voters set
>   Does not read well.
> 28.2 "used to configured:
>   used to configure
> 28.3 "When at voter becomes a leader"
>   when a voter
> 28.4 "write an VotersRecord controler"
>   a VotersRecord; controller
> 28.5 "will get bootstrap"
>   bootstrapped
> 28.6 "the leader of the KRaft cluster metadata leader"
>   the leader of the KRaft cluster metadata partition
> 28.7 "until the call as been acknowledge"
>   has been acknowledged
> 28.8 "As describe in KIP-595"
>   described
> 28.9 "The KRaft implementation and protocol describe"
>   described
> 28.10 "In other, the KRaft topic partition won't"
>   In other words
> 28.11 "greater than their epic"
>   epoch
> 28.12 "so their state will be tracking using their ID and UUID"
>   tracked

Thanks. I'll fix these typos and see if I can find more typos in the KIP.
--
-José

Reply via email to