kevin-wu24 commented on code in PR #20859:
URL: https://github.com/apache/kafka/pull/20859#discussion_r2565460133
##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -1805,6 +1809,19 @@ private boolean handleFetchResponse(
OptionalLong highWatermark = partitionResponse.highWatermark()
< 0 ?
OptionalLong.empty() :
OptionalLong.of(partitionResponse.highWatermark());
updateFollowerHighWatermark(state, highWatermark);
+
+ if (initHighWatermark < 0 && partitionResponse.highWatermark()
>= 0) {
+ initHighWatermark = partitionResponse.highWatermark();
+
+ state.highWatermark().ifPresent(hw -> {
+ if (hw.offset() >= initHighWatermark &&
nodeId.isPresent()) {
+ hasJoined =
partitionState.lastVoterSet().isVoter(
+ ReplicaKey.of(nodeId.getAsInt(),
nodeDirectoryId));
+ }
+ }
+ );
Review Comment:
hmmm, this if check seems incorrect. Doesn't this mean we won't ever enter
this method again if the leader sends us a partitionResponse.highWatermark() >=
0?
We should be checking the `highWatermark` against our local LEO, and if our
LEO >= HWM, the local node considers itself "caught up" and can set the
`hasJoined` state to the proper value. I believe this is correct because we can
only have at most one uncommitted voters record at a time.
Importantly, we need to enforce the state transition requirements in my
comment above here.
##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -1805,6 +1809,19 @@ private boolean handleFetchResponse(
OptionalLong highWatermark = partitionResponse.highWatermark()
< 0 ?
OptionalLong.empty() :
OptionalLong.of(partitionResponse.highWatermark());
updateFollowerHighWatermark(state, highWatermark);
+
+ if (initHighWatermark < 0 && partitionResponse.highWatermark()
>= 0) {
+ initHighWatermark = partitionResponse.highWatermark();
+
+ state.highWatermark().ifPresent(hw -> {
+ if (hw.offset() >= initHighWatermark &&
nodeId.isPresent()) {
+ hasJoined =
partitionState.lastVoterSet().isVoter(
+ ReplicaKey.of(nodeId.getAsInt(),
nodeDirectoryId));
+ }
+ }
+ );
Review Comment:
hmmm, this if check seems incorrect. Doesn't this mean we won't ever enter
this method again if the leader sends us a `partitionResponse.highWatermark()
>= 0`?
We should be checking the `highWatermark` against our local LEO, and if our
LEO >= HWM, the local node considers itself "caught up" and can set the
`hasJoined` state to the proper value. I believe this is correct because we can
only have at most one uncommitted voters record at a time.
Importantly, we need to enforce the state transition requirements in my
comment above here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]