kevin-wu24 commented on code in PR #20859:
URL: https://github.com/apache/kafka/pull/20859#discussion_r2568445112
##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -500,11 +501,31 @@ public void initialize(
kafkaRaftMetrics,
externalKRaftMetrics
);
+
+ // Set up listener to track voter set changes
+ partitionState.setVoterSetChangeListener((offset, voterSet) -> {
+ // We dont need to check high watermark here since it already
check by
+ // hasJoined is not empty.
+ if (nodeId.isPresent() && hasJoined.isPresent()) {
+ ReplicaKey localReplicaKey = ReplicaKey.of(nodeId.getAsInt(),
nodeDirectoryId);
+ if (voterSet.isVoter(localReplicaKey) && !hasJoined.get()) {
+ logger.error("Detected that local node {} has been added
to voter set at offset {}",
+ localReplicaKey, offset);
+ hasJoined = Optional.of(true);
+ }
+ }
+ });
+
// Read the entire log
logger.info("Reading KRaft snapshot and log as part of the
initialization");
partitionState.updateState();
logger.info("Starting voters are {}", partitionState.lastVoterSet());
+ if (nodeId.isPresent() && canBecomeVoter && quorumConfig.autoJoin()
+ && isVoter(ReplicaKey.of(nodeId.getAsInt(), nodeDirectoryId)))
{
+ hasJoined = Optional.of(true);
+ }
+
Review Comment:
It is not super obvious, but I explained here why this code is incorrect:
https://github.com/apache/kafka/pull/20859#pullrequestreview-3506989795
##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -3372,8 +3393,8 @@ private boolean
shouldSendAddOrRemoveVoterRequest(FollowerState state, long curr
* and are configured to auto join should attempt to automatically
join the voter
* set for the configured topic partition.
*/
- return !hasJoined &&
partitionState.lastKraftVersion().isReconfigSupported() && canBecomeVoter
- && quorumConfig.autoJoin() &&
state.hasUpdateVoterSetPeriodExpired(currentTimeMs);
+ return hasJoined.isPresent() && !hasJoined.get() &&
partitionState.lastKraftVersion().isReconfigSupported()
Review Comment:
We need to add `!hasFetchTimeoutExpired() && local node LEO >= leader HWM`
to this check.
##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -1810,18 +1836,16 @@ private boolean handleFetchResponse(
OptionalLong.empty() :
OptionalLong.of(partitionResponse.highWatermark());
updateFollowerHighWatermark(state, highWatermark);
- if (initHighWatermark < 0 && partitionResponse.highWatermark()
>= 0) {
- initHighWatermark = partitionResponse.highWatermark();
-
- state.highWatermark().ifPresent(hw -> {
- if (hw.offset() >= initHighWatermark &&
nodeId.isPresent()) {
- hasJoined =
partitionState.lastVoterSet().isVoter(
- ReplicaKey.of(nodeId.getAsInt(),
nodeDirectoryId));
- }
- }
- );
+ if (nodeId.isPresent() && logEndOffset() >=
partitionResponse.highWatermark()) {
Review Comment:
We should only make these state changes when auto join is enabled.
##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -500,11 +501,31 @@ public void initialize(
kafkaRaftMetrics,
externalKRaftMetrics
);
+
+ // Set up listener to track voter set changes
+ partitionState.setVoterSetChangeListener((offset, voterSet) -> {
Review Comment:
We should only set this listener when autojoin is enabled and we are on a
controller.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]