[GitHub] [kafka] hachikuji commented on a change in pull request #10309: KAFKA-12181; Loosen raft fetch offset validation of remote replicas
hachikuji commented on a change in pull request #10309: URL: https://github.com/apache/kafka/pull/10309#discussion_r598986869 ## File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java ## @@ -170,36 +183,38 @@ public boolean updateReplicaState(int replicaId, .collect(Collectors.toList()); } -private List followersByDescendingFetchOffset() { -return new ArrayList<>(this.voterReplicaStates.values()).stream() +private List followersByDescendingFetchOffset() { +return new ArrayList<>(this.voterStates.values()).stream() .sorted() .collect(Collectors.toList()); } private boolean updateEndOffset(ReplicaState state, LogOffsetMetadata endOffsetMetadata) { state.endOffset.ifPresent(currentEndOffset -> { -if (currentEndOffset.offset > endOffsetMetadata.offset) -throw new IllegalArgumentException("Non-monotonic update to end offset for nodeId " + state.nodeId); +if (currentEndOffset.offset > endOffsetMetadata.offset) { +if (state.nodeId == localId) { +throw new IllegalStateException("Detected non-monotonic update of local " + +"end offset: " + currentEndOffset.offset + " -> " + endOffsetMetadata.offset); +} else { +log.warn("Detected non-monotonic update of fetch offset from nodeId {}: {} -> {}", Review comment: The situation we are trying to handle is when a follower loses its disk. Basically the damage is already done by the time we receive the Fetch and the only thing we can do is let the follower try to catch back up. The problem with the old logic is that it prevented this even in situations which would not violate guarantees. I am planning to file a follow-up jira to think of some ways to handle disk loss situations more generally. We would like to at least detect the situation and see if we can prevent it from causing too much damage. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10309: KAFKA-12181; Loosen raft fetch offset validation of remote replicas
hachikuji commented on a change in pull request #10309: URL: https://github.com/apache/kafka/pull/10309#discussion_r598983673 ## File path: raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java ## @@ -184,26 +220,40 @@ private LeaderState setUpLeaderAndFollowers(int follower1, @Test public void testGetObserverStatesWithObserver() { int observerId = 10; -long endOffset = 10L; +long epochStartOffset = 10L; -LeaderState state = new LeaderState(localId, epoch, endOffset, mkSet(localId), Collections.emptySet()); +LeaderState state = newLeaderState(mkSet(localId), epochStartOffset); long timestamp = 20L; -assertFalse(state.updateReplicaState(observerId, timestamp, new LogOffsetMetadata(endOffset))); +assertFalse(state.updateReplicaState(observerId, timestamp, new LogOffsetMetadata(epochStartOffset))); -assertEquals(Collections.singletonMap(observerId, endOffset), state.getObserverStates(timestamp)); +assertEquals(Collections.singletonMap(observerId, epochStartOffset), state.getObserverStates(timestamp)); } @Test public void testNoOpForNegativeRemoteNodeId() { int observerId = -1; -long endOffset = 10L; +long epochStartOffset = 10L; Review comment: I'm not sure I'd call it wrong. The epoch start offset is initialized as the current log end offset. But I thought it was better to choose a more explicit name. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10309: KAFKA-12181; Loosen raft fetch offset validation of remote replicas
hachikuji commented on a change in pull request #10309: URL: https://github.com/apache/kafka/pull/10309#discussion_r598980860 ## File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java ## @@ -33,22 +36,25 @@ * they acknowledge the leader. */ public class LeaderState implements EpochState { +static final long OBSERVER_SESSION_TIMEOUT_MS = 300_000L; + private final int localId; private final int epoch; private final long epochStartOffset; private Optional highWatermark; -private final Map voterReplicaStates = new HashMap<>(); -private final Map observerReplicaStates = new HashMap<>(); +private final Map voterStates = new HashMap<>(); +private final Map observerStates = new HashMap<>(); private final Set grantingVoters = new HashSet<>(); -private static final long OBSERVER_SESSION_TIMEOUT_MS = 300_000L; +private final Logger log; protected LeaderState( int localId, int epoch, long epochStartOffset, Set voters, -Set grantingVoters +Set grantingVoters, +LogContext logContext Review comment: The log context is useful because it carries with it a logging prefix which can be used to distinguish log messages. For example, in a streams application, the fact that we have multiple producers can make debugging difficult. Or in the context of integration/system/simulation testing, we often get logs from multiple nodes mixed together. With a common prefix, it is easy to grep messages for a particular instance so long as the `LogContext` is carried through to all the dependencies. Sometimes it is a little annoying to add the extra parameter, but it is worthwhile for improved debugging whenever the parent object already has a log context. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10309: KAFKA-12181; Loosen raft fetch offset validation of remote replicas
hachikuji commented on a change in pull request #10309: URL: https://github.com/apache/kafka/pull/10309#discussion_r596330007 ## File path: raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java ## @@ -401,6 +401,51 @@ private void checkBackToBackLeaderFailures(QuorumConfig config) { } } +@Test +public void checkSingleNodeCommittedDataLossQuorumSizeThree() { +checkSingleNodeCommittedDataLoss(new QuorumConfig(3, 0)); +} + +private void checkSingleNodeCommittedDataLoss(QuorumConfig config) { +assertTrue(config.numVoters > 2, +"This test requires the cluster to be able to recover from one failed node"); + +for (int seed = 0; seed < 100; seed++) { +// We run this test without the `MonotonicEpoch` and `MajorityReachedHighWatermark` +// invariants since the loss of committed data on one node can violate them. + +Cluster cluster = new Cluster(config, seed); +EventScheduler scheduler = new EventScheduler(cluster.random, cluster.time); +scheduler.addInvariant(new MonotonicHighWatermark(cluster)); +scheduler.addInvariant(new SingleLeader(cluster)); +scheduler.addValidation(new ConsistentCommittedData(cluster)); + +MessageRouter router = new MessageRouter(cluster); + +cluster.startAll(); +schedulePolling(scheduler, cluster, 3, 5); +scheduler.schedule(router::deliverAll, 0, 2, 5); +scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3); +scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10)); + +RaftNode node = cluster.randomRunning().orElseThrow(() -> +new AssertionError("Failed to find running node") +); + +// Kill a random node and drop all of its persistent state. The Raft +// protocol guarantees should still ensure we lose no committed data +// as long as a new leader is elected before the failed node is restarted. +cluster.kill(node.nodeId); +cluster.deletePersistentState(node.nodeId); Review comment: Good idea. Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10309: KAFKA-12181; Loosen raft fetch offset validation of remote replicas
hachikuji commented on a change in pull request #10309: URL: https://github.com/apache/kafka/pull/10309#discussion_r594725779 ## File path: raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java ## @@ -401,6 +401,51 @@ private void checkBackToBackLeaderFailures(QuorumConfig config) { } } +@Test +public void checkSingleNodeCommittedDataLossQuorumSizeThree() { +checkSingleNodeCommittedDataLoss(new QuorumConfig(3, 0)); +} + +private void checkSingleNodeCommittedDataLoss(QuorumConfig config) { +assertTrue(config.numVoters > 2, +"This test requires the cluster to be able to recover from one failed node"); + +for (int seed = 0; seed < 100; seed++) { +// We run this test without the `MonotonicEpoch` and `MajorityReachedHighWatermark` +// invariants since the loss of committed data on one node can violate them. + +Cluster cluster = new Cluster(config, seed); +EventScheduler scheduler = new EventScheduler(cluster.random, cluster.time); +scheduler.addInvariant(new MonotonicHighWatermark(cluster)); +scheduler.addInvariant(new SingleLeader(cluster)); +scheduler.addValidation(new ConsistentCommittedData(cluster)); + +MessageRouter router = new MessageRouter(cluster); + +cluster.startAll(); +schedulePolling(scheduler, cluster, 3, 5); +scheduler.schedule(router::deliverAll, 0, 2, 5); +scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3); +scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10)); + +RaftNode node = cluster.randomRunning().orElseThrow(() -> +new AssertionError("Failed to find running node") +); + +// Kill a random node and drop all of its persistent state. The Raft +// protocol guarantees should still ensure we lose no committed data +// as long as a new leader is elected before the failed node is restarted. +cluster.kill(node.nodeId); +cluster.deletePersistentState(node.nodeId); +scheduler.runUntil(() -> !cluster.hasLeader(node.nodeId) && cluster.hasConsistentLeader()); Review comment: It amounts to the same thing because `votedIdOpt` is only set when the election outcome has not been determined. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10309: KAFKA-12181; Loosen raft fetch offset validation of remote replicas
hachikuji commented on a change in pull request #10309: URL: https://github.com/apache/kafka/pull/10309#discussion_r594599970 ## File path: raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java ## @@ -401,6 +401,51 @@ private void checkBackToBackLeaderFailures(QuorumConfig config) { } } +@Test +public void checkSingleNodeCommittedDataLossQuorumSizeThree() { +checkSingleNodeCommittedDataLoss(new QuorumConfig(3, 0)); +} + +private void checkSingleNodeCommittedDataLoss(QuorumConfig config) { +assertTrue(config.numVoters > 2, +"This test requires the cluster to be able to recover from one failed node"); + +for (int seed = 0; seed < 100; seed++) { +// We run this test without the `MonotonicEpoch` and `MajorityReachedHighWatermark` +// invariants since the loss of committed data on one node can violate them. + +Cluster cluster = new Cluster(config, seed); +EventScheduler scheduler = new EventScheduler(cluster.random, cluster.time); +scheduler.addInvariant(new MonotonicHighWatermark(cluster)); +scheduler.addInvariant(new SingleLeader(cluster)); +scheduler.addValidation(new ConsistentCommittedData(cluster)); + +MessageRouter router = new MessageRouter(cluster); + +cluster.startAll(); +schedulePolling(scheduler, cluster, 3, 5); +scheduler.schedule(router::deliverAll, 0, 2, 5); +scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3); +scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10)); + +RaftNode node = cluster.randomRunning().orElseThrow(() -> +new AssertionError("Failed to find running node") +); + +// Kill a random node and drop all of its persistent state. The Raft +// protocol guarantees should still ensure we lose no committed data +// as long as a new leader is elected before the failed node is restarted. +cluster.kill(node.nodeId); +cluster.deletePersistentState(node.nodeId); +scheduler.runUntil(() -> !cluster.hasLeader(node.nodeId) && cluster.hasConsistentLeader()); Review comment: It is checking consistent `ElectionState`, which is basically the same as verifying all `quorum-state` files match. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10309: KAFKA-12181; Loosen raft fetch offset validation of remote replicas
hachikuji commented on a change in pull request #10309: URL: https://github.com/apache/kafka/pull/10309#discussion_r593509351 ## File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java ## @@ -247,7 +267,7 @@ private boolean isVoter(int remoteNodeId) { return voterReplicaStates.containsKey(remoteNodeId); } -private static class ReplicaState implements Comparable { +private static abstract class ReplicaState implements Comparable { Review comment: Good point. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org