This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new fde8c7d4f RATIS-2242 change consistency criteria of heartbeat during
appendLog (#1215)
fde8c7d4f is described below
commit fde8c7d4f37f37c61d5e270cf069a8a604ea0d6e
Author: William Song <[email protected]>
AuthorDate: Fri Feb 21 03:38:50 2025 +0800
RATIS-2242 change consistency criteria of heartbeat during appendLog (#1215)
---
.../apache/ratis/server/impl/RaftServerImpl.java | 21 +++-
.../apache/ratis/server/impl/RaftServerProxy.java | 3 +-
.../apache/ratis/server/impl/ServerImplUtils.java | 110 +++++++++++++++++++++
3 files changed, 129 insertions(+), 5 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index c67329cae..a6798c48b 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -17,7 +17,6 @@
*/
package org.apache.ratis.server.impl;
-import java.util.concurrent.CountDownLatch;
import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.metrics.Timekeeper;
@@ -25,11 +24,11 @@ import
org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult;
import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
-import org.apache.ratis.proto.RaftProtos.LogInfoProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotResult;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.proto.RaftProtos.LogInfoProto;
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
import org.apache.ratis.proto.RaftProtos.RaftConfigurationProto;
@@ -82,6 +81,8 @@ import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RaftServerRpc;
import org.apache.ratis.server.impl.LeaderElection.Phase;
import org.apache.ratis.server.impl.RetryCacheImpl.CacheEntry;
+import org.apache.ratis.server.impl.ServerImplUtils.ConsecutiveIndices;
+import org.apache.ratis.server.impl.ServerImplUtils.NavigableIndices;
import org.apache.ratis.server.leader.LeaderState;
import org.apache.ratis.server.metrics.LeaderElectionMetrics;
import org.apache.ratis.server.metrics.RaftServerMetricsImpl;
@@ -113,6 +114,7 @@ import org.apache.ratis.util.ProtoUtils;
import org.apache.ratis.util.ReferenceCountedObject;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.function.CheckedSupplier;
+import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
import java.io.File;
import java.io.IOException;
@@ -128,6 +130,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadLocalRandom;
@@ -260,6 +263,7 @@ class RaftServerImpl implements RaftServer.Division,
private final ThreadGroup threadGroup;
private final AtomicReference<CompletableFuture<Void>> appendLogFuture;
+ private final NavigableIndices appendLogTermIndices = new NavigableIndices();
RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy
proxy, RaftStorage.StartupOption option)
throws IOException {
@@ -1687,10 +1691,19 @@ class RaftServerImpl implements RaftServer.Division,
});
}
private CompletableFuture<Void>
appendLog(ReferenceCountedObject<List<LogEntryProto>> entriesRef) {
+ final List<ConsecutiveIndices> entriesTermIndices;
+ try(UncheckedAutoCloseableSupplier<List<LogEntryProto>> entries =
entriesRef.retainAndReleaseOnClose()) {
+ entriesTermIndices = ConsecutiveIndices.convert(entries.get());
+ appendLogTermIndices.append(entriesTermIndices);
+ }
+
entriesRef.retain();
return appendLogFuture.updateAndGet(f -> f.thenCompose(
ignored -> JavaUtils.allOf(state.getLog().append(entriesRef))))
- .whenComplete((v, e) -> entriesRef.release());
+ .whenComplete((v, e) -> {
+ entriesRef.release();
+ appendLogTermIndices.removeExisting(entriesTermIndices);
+ });
}
private long checkInconsistentAppendEntries(TermIndex previous,
List<LogEntryProto> entries) {
@@ -1717,7 +1730,7 @@ class RaftServerImpl implements RaftServer.Division,
}
// Check if "previous" is contained in current state.
- if (previous != null && !state.containsTermIndex(previous)) {
+ if (previous != null && !(appendLogTermIndices.contains(previous) ||
state.containsTermIndex(previous))) {
final long replyNextIndex = Math.min(state.getNextIndex(),
previous.getIndex());
LOG.info("{}: Failed appendEntries as previous log entry ({}) is not
found", getMemberId(), previous);
return replyNextIndex;
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 6b41c8c2a..523a41833 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -653,7 +653,8 @@ class RaftServerProxy implements RaftServer {
try {
final RaftGroupId groupId =
ProtoUtils.toRaftGroupId(request.getServerRequest().getRaftGroupId());
return getImplFuture(groupId)
- .thenCompose(impl -> impl.executeSubmitServerRequestAsync(() ->
impl.appendEntriesAsync(requestRef)));
+ .thenCompose(impl -> JavaUtils.callAsUnchecked(
+ () -> impl.appendEntriesAsync(requestRef),
CompletionException::new));
} finally {
requestRef.release();
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
index e26c6e0ab..c5010a534 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
@@ -40,11 +40,121 @@ import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.TimeDuration;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
/** Server utilities for internal use. */
public final class ServerImplUtils {
+ /** The consecutive indices within the same term. */
+ static class ConsecutiveIndices {
+ /** Convert the given entries to a list of {@link ConsecutiveIndices} */
+ static List<ConsecutiveIndices> convert(List<LogEntryProto> entries) {
+ if (entries == null || entries.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ List<ConsecutiveIndices> indices = null;
+
+ LogEntryProto previous = entries.get(0);
+ long startIndex = previous.getIndex();
+ int count = 1;
+
+ for (int i = 1; i < entries.size(); i++) {
+ final LogEntryProto current = entries.get(i);
+ // validate if the indices are consecutive
+ Preconditions.assertSame(previous.getIndex() + 1, current.getIndex(),
"index");
+
+ if (current.getTerm() == previous.getTerm()) {
+ count++;
+ } else {
+ // validate if the terms are increasing
+ Preconditions.assertTrue(previous.getTerm() < current.getTerm(),
"term");
+ if (indices == null) {
+ indices = new ArrayList<>();
+ }
+ indices.add(new ConsecutiveIndices(previous.getTerm(), startIndex,
count));
+
+ startIndex = current.getIndex();
+ count = 1;
+ }
+ previous = current;
+ }
+
+ final ConsecutiveIndices last = new
ConsecutiveIndices(previous.getTerm(), startIndex, count);
+ if (indices == null) {
+ return Collections.singletonList(last);
+ } else {
+ indices.add(last);
+ return indices;
+ }
+ }
+
+ private final long term;
+ private final long startIndex;
+ private final int count;
+
+ ConsecutiveIndices(long term, long startIndex, int count) {
+ Preconditions.assertTrue(count > 0, () -> "count = " + count + " <= 0 ");
+ this.term = term;
+ this.startIndex = startIndex;
+ this.count = count;
+ }
+
+ long getNextIndex() {
+ return startIndex + count;
+ }
+
+ Long getTerm(long index) {
+ final long diff = index - startIndex;
+ return diff < 0 || diff >= count ? null: term;
+ }
+ }
+
+ /** A data structure to support the {@link #contains(TermIndex)} method. */
+ static class NavigableIndices {
+ private final NavigableMap<Long, ConsecutiveIndices> map = new TreeMap<>();
+
+ boolean contains(TermIndex ti) {
+ final Long term = getTerm(ti.getIndex());
+ return term != null && term == ti.getTerm();
+ }
+
+ synchronized Long getTerm(long index) {
+ if (map.isEmpty()) {
+ return null;
+ }
+
+ final Map.Entry<Long, ConsecutiveIndices> floorEntry =
map.floorEntry(index);
+ if (floorEntry == null) {
+ return null;
+ }
+ return floorEntry.getValue().getTerm(index);
+ }
+
+ synchronized void append(List<ConsecutiveIndices> entriesTermIndices) {
+ for(ConsecutiveIndices indices : entriesTermIndices) {
+ // validate startIndex
+ final Map.Entry<Long, ConsecutiveIndices> lastEntry = map.lastEntry();
+ if (lastEntry != null) {
+ Preconditions.assertSame(lastEntry.getValue().getNextIndex(),
indices.startIndex, "startIndex");
+ }
+ map.put(indices.startIndex, indices);
+ }
+ }
+
+ synchronized void removeExisting(List<ConsecutiveIndices>
entriesTermIndices) {
+ for(ConsecutiveIndices indices : entriesTermIndices) {
+ final ConsecutiveIndices removed = map.remove(indices.startIndex);
+ Preconditions.assertSame(indices, removed, "removed");
+ }
+ }
+ }
+
private ServerImplUtils() {
//Never constructed
}