jsancio commented on a change in pull request #10932: URL: https://github.com/apache/kafka/pull/10932#discussion_r660140438
########## File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java ########## @@ -38,6 +38,9 @@ private OptionalInt claimedEpoch = OptionalInt.empty(); private long lastOffsetSnapshotted = -1; + private int handleSnapshotCalls = 0; + private boolean handleSnapshotCalled = false; Review comment: We should only store `handleSnapshotCalls` since `handleSnapshotCalled` is always equal to `handleSnapshotCalls > 0`. ########## File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java ########## @@ -157,13 +162,27 @@ public synchronized void handleSnapshot(SnapshotReader<Integer> reader) { public synchronized void handleLeaderChange(LeaderAndEpoch newLeader) { if (newLeader.isLeader(nodeId)) { log.debug("Counter uncommitted value initialized to {} after claiming leadership in epoch {}", - committed, newLeader); + committed, newLeader); uncommitted = committed; claimedEpoch = OptionalInt.of(newLeader.epoch()); } else { log.debug("Counter uncommitted value reset after resigning leadership"); uncommitted = -1; claimedEpoch = OptionalInt.empty(); } + handleSnapshotCalled = false; + handleSnapshotCalls = 0; + } + + public boolean isLeader() { + return this.client.leaderAndEpoch().isLeader(nodeId); Review comment: I think this is the issue you reported in the Jira. The `RaftClient.Listener` should not use `RaftClient.leaderAndEpoch` to determine if it is the leader. It should instead use `RaftClient.Listener.handleLeaderChange`. For this state machine `ReplicatedCounter` we should look at the `claimedEpoch` variable. I am going to create an issue to remove this method. cc @hachikuji -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org