[GitHub] [kafka] hachikuji commented on a change in pull request #10309: KAFKA-12181; Loosen raft fetch offset validation of remote replicas

2021-03-22 Thread GitBox


hachikuji commented on a change in pull request #10309:
URL: https://github.com/apache/kafka/pull/10309#discussion_r598986869



##
File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java
##
@@ -170,36 +183,38 @@ public boolean updateReplicaState(int replicaId,
 .collect(Collectors.toList());
 }
 
-private List followersByDescendingFetchOffset() {
-return new ArrayList<>(this.voterReplicaStates.values()).stream()
+private List followersByDescendingFetchOffset() {
+return new ArrayList<>(this.voterStates.values()).stream()
 .sorted()
 .collect(Collectors.toList());
 }
 
 private boolean updateEndOffset(ReplicaState state,
 LogOffsetMetadata endOffsetMetadata) {
 state.endOffset.ifPresent(currentEndOffset -> {
-if (currentEndOffset.offset > endOffsetMetadata.offset)
-throw new IllegalArgumentException("Non-monotonic update to 
end offset for nodeId " + state.nodeId);
+if (currentEndOffset.offset > endOffsetMetadata.offset) {
+if (state.nodeId == localId) {
+throw new IllegalStateException("Detected non-monotonic 
update of local " +
+"end offset: " + currentEndOffset.offset + " -> " + 
endOffsetMetadata.offset);
+} else {
+log.warn("Detected non-monotonic update of fetch offset 
from nodeId {}: {} -> {}",

Review comment:
   The situation we are trying to handle is when a follower loses its disk. 
Basically the damage is already done by the time we receive the Fetch and the 
only thing we can do is let the follower try to catch back up. The problem with 
the old logic is that it prevented this even in situations which would not 
violate guarantees. I am planning to file a follow-up jira to think of some 
ways to handle disk loss situations more generally. We would like to at least 
detect the situation and see if we can prevent it from causing too much damage.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10309: KAFKA-12181; Loosen raft fetch offset validation of remote replicas

2021-03-22 Thread GitBox


hachikuji commented on a change in pull request #10309:
URL: https://github.com/apache/kafka/pull/10309#discussion_r598983673



##
File path: raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
##
@@ -184,26 +220,40 @@ private LeaderState setUpLeaderAndFollowers(int follower1,
 @Test
 public void testGetObserverStatesWithObserver() {
 int observerId = 10;
-long endOffset = 10L;
+long epochStartOffset = 10L;
 
-LeaderState state = new LeaderState(localId, epoch, endOffset, 
mkSet(localId), Collections.emptySet());
+LeaderState state = newLeaderState(mkSet(localId), epochStartOffset);
 long timestamp = 20L;
-assertFalse(state.updateReplicaState(observerId, timestamp, new 
LogOffsetMetadata(endOffset)));
+assertFalse(state.updateReplicaState(observerId, timestamp, new 
LogOffsetMetadata(epochStartOffset)));
 
-assertEquals(Collections.singletonMap(observerId, endOffset), 
state.getObserverStates(timestamp));
+assertEquals(Collections.singletonMap(observerId, epochStartOffset), 
state.getObserverStates(timestamp));
 }
 
 @Test
 public void testNoOpForNegativeRemoteNodeId() {
 int observerId = -1;
-long endOffset = 10L;
+long epochStartOffset = 10L;

Review comment:
   I'm not sure I'd call it wrong. The epoch start offset is initialized as 
the current log end offset. But I thought it was better to choose a more 
explicit name.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10309: KAFKA-12181; Loosen raft fetch offset validation of remote replicas

2021-03-22 Thread GitBox


hachikuji commented on a change in pull request #10309:
URL: https://github.com/apache/kafka/pull/10309#discussion_r598980860



##
File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java
##
@@ -33,22 +36,25 @@
  * they acknowledge the leader.
  */
 public class LeaderState implements EpochState {
+static final long OBSERVER_SESSION_TIMEOUT_MS = 300_000L;
+
 private final int localId;
 private final int epoch;
 private final long epochStartOffset;
 
 private Optional highWatermark;
-private final Map voterReplicaStates = new 
HashMap<>();
-private final Map observerReplicaStates = new 
HashMap<>();
+private final Map voterStates = new HashMap<>();
+private final Map observerStates = new HashMap<>();
 private final Set grantingVoters = new HashSet<>();
-private static final long OBSERVER_SESSION_TIMEOUT_MS = 300_000L;
+private final Logger log;
 
 protected LeaderState(
 int localId,
 int epoch,
 long epochStartOffset,
 Set voters,
-Set grantingVoters
+Set grantingVoters,
+LogContext logContext

Review comment:
   The log context is useful because it carries with it a logging prefix 
which can be used to distinguish log messages. For example, in a streams 
application, the fact that we have multiple producers can make debugging 
difficult. Or in the context of integration/system/simulation testing, we often 
get logs from multiple nodes mixed together. With a common prefix, it is easy 
to grep messages for a particular instance so long as the `LogContext` is 
carried through to all the dependencies. Sometimes it is a little annoying to 
add the extra parameter, but it is worthwhile for improved debugging whenever 
the parent object already has a log context.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10309: KAFKA-12181; Loosen raft fetch offset validation of remote replicas

2021-03-17 Thread GitBox


hachikuji commented on a change in pull request #10309:
URL: https://github.com/apache/kafka/pull/10309#discussion_r596330007



##
File path: raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
##
@@ -401,6 +401,51 @@ private void checkBackToBackLeaderFailures(QuorumConfig 
config) {
 }
 }
 
+@Test
+public void checkSingleNodeCommittedDataLossQuorumSizeThree() {
+checkSingleNodeCommittedDataLoss(new QuorumConfig(3, 0));
+}
+
+private void checkSingleNodeCommittedDataLoss(QuorumConfig config) {
+assertTrue(config.numVoters > 2,
+"This test requires the cluster to be able to recover from one 
failed node");
+
+for (int seed = 0; seed < 100; seed++) {
+// We run this test without the `MonotonicEpoch` and 
`MajorityReachedHighWatermark`
+// invariants since the loss of committed data on one node can 
violate them.
+
+Cluster cluster = new Cluster(config, seed);
+EventScheduler scheduler = new EventScheduler(cluster.random, 
cluster.time);
+scheduler.addInvariant(new MonotonicHighWatermark(cluster));
+scheduler.addInvariant(new SingleLeader(cluster));
+scheduler.addValidation(new ConsistentCommittedData(cluster));
+
+MessageRouter router = new MessageRouter(cluster);
+
+cluster.startAll();
+schedulePolling(scheduler, cluster, 3, 5);
+scheduler.schedule(router::deliverAll, 0, 2, 5);
+scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3);
+scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10));
+
+RaftNode node = cluster.randomRunning().orElseThrow(() ->
+new AssertionError("Failed to find running node")
+);
+
+// Kill a random node and drop all of its persistent state. The 
Raft
+// protocol guarantees should still ensure we lose no committed 
data
+// as long as a new leader is elected before the failed node is 
restarted.
+cluster.kill(node.nodeId);
+cluster.deletePersistentState(node.nodeId);

Review comment:
   Good idea. Fixed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10309: KAFKA-12181; Loosen raft fetch offset validation of remote replicas

2021-03-15 Thread GitBox


hachikuji commented on a change in pull request #10309:
URL: https://github.com/apache/kafka/pull/10309#discussion_r594725779



##
File path: raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
##
@@ -401,6 +401,51 @@ private void checkBackToBackLeaderFailures(QuorumConfig 
config) {
 }
 }
 
+@Test
+public void checkSingleNodeCommittedDataLossQuorumSizeThree() {
+checkSingleNodeCommittedDataLoss(new QuorumConfig(3, 0));
+}
+
+private void checkSingleNodeCommittedDataLoss(QuorumConfig config) {
+assertTrue(config.numVoters > 2,
+"This test requires the cluster to be able to recover from one 
failed node");
+
+for (int seed = 0; seed < 100; seed++) {
+// We run this test without the `MonotonicEpoch` and 
`MajorityReachedHighWatermark`
+// invariants since the loss of committed data on one node can 
violate them.
+
+Cluster cluster = new Cluster(config, seed);
+EventScheduler scheduler = new EventScheduler(cluster.random, 
cluster.time);
+scheduler.addInvariant(new MonotonicHighWatermark(cluster));
+scheduler.addInvariant(new SingleLeader(cluster));
+scheduler.addValidation(new ConsistentCommittedData(cluster));
+
+MessageRouter router = new MessageRouter(cluster);
+
+cluster.startAll();
+schedulePolling(scheduler, cluster, 3, 5);
+scheduler.schedule(router::deliverAll, 0, 2, 5);
+scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3);
+scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10));
+
+RaftNode node = cluster.randomRunning().orElseThrow(() ->
+new AssertionError("Failed to find running node")
+);
+
+// Kill a random node and drop all of its persistent state. The 
Raft
+// protocol guarantees should still ensure we lose no committed 
data
+// as long as a new leader is elected before the failed node is 
restarted.
+cluster.kill(node.nodeId);
+cluster.deletePersistentState(node.nodeId);
+scheduler.runUntil(() -> !cluster.hasLeader(node.nodeId) && 
cluster.hasConsistentLeader());

Review comment:
   It amounts to the same thing because `votedIdOpt` is only set when the 
election outcome has not been determined.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10309: KAFKA-12181; Loosen raft fetch offset validation of remote replicas

2021-03-15 Thread GitBox


hachikuji commented on a change in pull request #10309:
URL: https://github.com/apache/kafka/pull/10309#discussion_r594599970



##
File path: raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
##
@@ -401,6 +401,51 @@ private void checkBackToBackLeaderFailures(QuorumConfig 
config) {
 }
 }
 
+@Test
+public void checkSingleNodeCommittedDataLossQuorumSizeThree() {
+checkSingleNodeCommittedDataLoss(new QuorumConfig(3, 0));
+}
+
+private void checkSingleNodeCommittedDataLoss(QuorumConfig config) {
+assertTrue(config.numVoters > 2,
+"This test requires the cluster to be able to recover from one 
failed node");
+
+for (int seed = 0; seed < 100; seed++) {
+// We run this test without the `MonotonicEpoch` and 
`MajorityReachedHighWatermark`
+// invariants since the loss of committed data on one node can 
violate them.
+
+Cluster cluster = new Cluster(config, seed);
+EventScheduler scheduler = new EventScheduler(cluster.random, 
cluster.time);
+scheduler.addInvariant(new MonotonicHighWatermark(cluster));
+scheduler.addInvariant(new SingleLeader(cluster));
+scheduler.addValidation(new ConsistentCommittedData(cluster));
+
+MessageRouter router = new MessageRouter(cluster);
+
+cluster.startAll();
+schedulePolling(scheduler, cluster, 3, 5);
+scheduler.schedule(router::deliverAll, 0, 2, 5);
+scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3);
+scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10));
+
+RaftNode node = cluster.randomRunning().orElseThrow(() ->
+new AssertionError("Failed to find running node")
+);
+
+// Kill a random node and drop all of its persistent state. The 
Raft
+// protocol guarantees should still ensure we lose no committed 
data
+// as long as a new leader is elected before the failed node is 
restarted.
+cluster.kill(node.nodeId);
+cluster.deletePersistentState(node.nodeId);
+scheduler.runUntil(() -> !cluster.hasLeader(node.nodeId) && 
cluster.hasConsistentLeader());

Review comment:
   It is checking consistent `ElectionState`, which is basically the same 
as verifying all `quorum-state` files match.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10309: KAFKA-12181; Loosen raft fetch offset validation of remote replicas

2021-03-12 Thread GitBox


hachikuji commented on a change in pull request #10309:
URL: https://github.com/apache/kafka/pull/10309#discussion_r593509351



##
File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java
##
@@ -247,7 +267,7 @@ private boolean isVoter(int remoteNodeId) {
 return voterReplicaStates.containsKey(remoteNodeId);
 }
 
-private static class ReplicaState implements Comparable {
+private static abstract class ReplicaState implements 
Comparable {

Review comment:
   Good point.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org