This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 40539ece1 RATIS-2333. Fix TestInstallSnapshotNotificationWithGrpc
failure. (#1289)
40539ece1 is described below
commit 40539ece1dd582752b5ef104760ec0c84cdf9e33
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Sep 22 11:47:08 2025 -0700
RATIS-2333. Fix TestInstallSnapshotNotificationWithGrpc failure. (#1289)
---
.../main/java/org/apache/ratis/grpc/GrpcUtil.java | 18 ++++++++++--------
.../grpc/server/GrpcServerProtocolService.java | 8 ++++++--
.../server/impl/SnapshotInstallationHandler.java | 3 ++-
.../ratis/InstallSnapshotNotificationTests.java | 22 +++++++++++-----------
.../ratis/statemachine/RaftSnapshotBaseTest.java | 10 +++++-----
5 files changed, 34 insertions(+), 27 deletions(-)
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
index 3645980ab..2f9ee01ec 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
@@ -138,8 +138,10 @@ public interface GrpcUtil {
static long getCallId(Throwable t) {
if (t instanceof StatusRuntimeException) {
final Metadata trailers = ((StatusRuntimeException)t).getTrailers();
- String callId = trailers.get(CALL_ID);
- return callId != null ? Integer.parseInt(callId) : -1;
+ if (trailers != null) {
+ final String callId = trailers.get(CALL_ID);
+ return callId != null ? Integer.parseInt(callId) : -1;
+ }
}
return -1;
}
@@ -147,8 +149,8 @@ public interface GrpcUtil {
static boolean isHeartbeat(Throwable t) {
if (t instanceof StatusRuntimeException) {
final Metadata trailers = ((StatusRuntimeException)t).getTrailers();
- String isHeartbeat = trailers != null ? trailers.get(HEARTBEAT) : null;
- return isHeartbeat != null && Boolean.valueOf(isHeartbeat);
+ final String isHeartbeat = trailers != null ? trailers.get(HEARTBEAT) :
null;
+ return Boolean.parseBoolean(isHeartbeat);
}
return false;
}
@@ -156,7 +158,7 @@ public interface GrpcUtil {
static IOException unwrapIOException(Throwable t) {
final IOException e;
if (t instanceof StatusRuntimeException) {
- e = GrpcUtil.unwrapException((StatusRuntimeException) t);
+ e = unwrapException((StatusRuntimeException) t);
} else {
e = IOUtils.asIOException(t);
}
@@ -172,7 +174,7 @@ public interface GrpcUtil {
supplier.get().whenComplete((reply, exception) -> {
if (exception != null) {
warning.accept(exception);
- responseObserver.onError(GrpcUtil.wrapException(exception));
+ responseObserver.onError(wrapException(exception));
} else {
responseObserver.onNext(toProto.apply(reply));
responseObserver.onCompleted();
@@ -180,7 +182,7 @@ public interface GrpcUtil {
});
} catch (Exception e) {
warning.accept(e);
- responseObserver.onError(GrpcUtil.wrapException(e));
+ responseObserver.onError(wrapException(e));
}
}
@@ -189,7 +191,7 @@ public interface GrpcUtil {
}
class StatusRuntimeExceptionMetadataBuilder {
- private Metadata trailers = new Metadata();
+ private final Metadata trailers = new Metadata();
StatusRuntimeExceptionMetadataBuilder(Throwable t) {
trailers.put(EXCEPTION_TYPE_KEY, t.getClass().getCanonicalName());
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
index 75362fcf8..b123c44a7 100644
---
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
+++
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
@@ -24,6 +24,8 @@ import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.protocol.RaftServerProtocol;
import org.apache.ratis.server.util.ServerStringUtils;
+import org.apache.ratis.thirdparty.com.google.protobuf.MessageOrBuilder;
+import org.apache.ratis.thirdparty.com.google.protobuf.TextFormat;
import org.apache.ratis.thirdparty.io.grpc.Status;
import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
@@ -67,7 +69,8 @@ class GrpcServerProtocolService extends
RaftServerProtocolServiceImplBase {
}
}
- abstract class ServerRequestStreamObserver<REQUEST, REPLY> implements
StreamObserver<REQUEST> {
+ abstract class ServerRequestStreamObserver<REQUEST, REPLY extends
MessageOrBuilder>
+ implements StreamObserver<REQUEST> {
private final RaftServer.Op op;
private final Supplier<String> nameSupplier;
private final StreamObserver<REPLY> responseObserver;
@@ -172,7 +175,8 @@ class GrpcServerProtocolService extends
RaftServerProtocolServiceImplBase {
getId(), op, getPreviousRequestString(), suffix));
requestFuture.get().thenAccept(reply -> {
BatchLogger.print(BatchLogKey.COMPLETED_REPLY, getName(),
- suffix -> LOG.info("{}: Completed {}, lastReply: {} {}",
getId(), op, reply, suffix));
+ suffix -> LOG.info("{}: Completed {}, lastReply: {} {}",
+ getId(), op, TextFormat.shortDebugString(reply), suffix));
responseObserver.onCompleted();
});
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
index 4f1ac4177..2a2e9e2b5 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
@@ -36,6 +36,7 @@ import org.apache.ratis.server.protocol.RaftServerProtocol;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.util.ServerStringUtils;
+import org.apache.ratis.thirdparty.com.google.protobuf.TextFormat;
import org.apache.ratis.util.BatchLogger;
import org.apache.ratis.util.CodeInjectionForTesting;
import org.apache.ratis.util.LifeCycle;
@@ -144,7 +145,7 @@ class SnapshotInstallationHandler {
final LogEntryProto proto =
request.getLastRaftConfigurationLogEntryProto();
state.truncate(proto.getIndex());
if
(!state.getRaftConf().equals(LogProtoUtils.toRaftConfiguration(proto))) {
- LOG.info("{}: set new configuration {} from snapshot",
getMemberId(), proto);
+ LOG.info("{}: set new configuration {} from snapshot",
getMemberId(), TextFormat.shortDebugString(proto));
state.setRaftConf(proto);
state.writeRaftConfiguration(proto);
server.getStateMachine().event().notifyConfigurationChanged(
diff --git
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
index 411c93120..75c62127b 100644
---
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
+++
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
@@ -79,6 +79,7 @@ public abstract class
InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
RaftServerConfigKeys.Log.setPurgeGap(prop, PURGE_GAP);
RaftServerConfigKeys.Log.setSegmentSizeMax(prop,
SizeInBytes.valueOf(1024)); // 1k segment
+ RaftServerConfigKeys.LeaderElection.setMemberMajorityAdd(prop, true);
}
private static final int SNAPSHOT_TRIGGER_THRESHOLD = 64;
@@ -239,9 +240,8 @@ public abstract class
InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
final boolean set = LEADER_SNAPSHOT_INFO_REF.compareAndSet(null,
leaderSnapshotInfo);
Assertions.assertTrue(set);
- // add two more peers
- final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true,
- true);
+ // Add new peer(s)
+ final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(1, true,
true);
// trigger setConfiguration
RaftServerTestUtil.runWithMinorityPeers(cluster,
Arrays.asList(change.allPeersInNewConf),
peers ->
cluster.setConfiguration(peers.toArray(RaftPeer.emptyArray())));
@@ -389,9 +389,9 @@ public abstract class
InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
follower.getRaftLog().getNextIndex());
}
- // Add two more peers who will need snapshots from the leader.
- final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true,
- true);
+ // Add new peer(s) who will need snapshots from the leader.
+ final int numNewPeers = 1;
+ final MiniRaftCluster.PeerChanges change =
cluster.addNewPeers(numNewPeers, true, true);
// trigger setConfiguration
RaftServerTestUtil.runWithMinorityPeers(cluster,
Arrays.asList(change.allPeersInNewConf),
peers ->
cluster.setConfiguration(peers.toArray(RaftPeer.emptyArray())));
@@ -412,7 +412,7 @@ public abstract class
InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
}
// Make sure each new peer got one snapshot notification.
- Assertions.assertEquals(2, numSnapshotRequests.get());
+ Assertions.assertEquals(numNewPeers, numSnapshotRequests.get());
} finally {
cluster.shutdown();
@@ -556,9 +556,9 @@ public abstract class
InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
final boolean set = LEADER_SNAPSHOT_INFO_REF.compareAndSet(null,
leaderSnapshotInfo);
Assertions.assertTrue(set);
- // add two more peers
- final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true,
- true);
+ // Add new peer(s)
+ final int numNewPeers = 1;
+ final MiniRaftCluster.PeerChanges change =
cluster.addNewPeers(numNewPeers, true, true);
// trigger setConfiguration
RaftServerTestUtil.runWithMinorityPeers(cluster,
Arrays.asList(change.allPeersInNewConf),
peers ->
cluster.setConfiguration(peers.toArray(RaftPeer.emptyArray())));
@@ -573,7 +573,7 @@ public abstract class
InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
}
// Make sure each new peer got at least one snapshot notification.
- Assertions.assertTrue(2 <= numSnapshotRequests.get());
+ Assertions.assertTrue(numNewPeers <= numSnapshotRequests.get());
} finally {
cluster.shutdown();
}
diff --git
a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
index 44ae74c4c..cd1a2eb55 100644
---
a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
+++
b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
@@ -96,16 +96,16 @@ public abstract class RaftSnapshotBaseTest<CLUSTER extends
MiniRaftCluster>
assertLogContent(leader, true);
}
- public static void assertLogContent(RaftServer.Division server, boolean
isLeader) throws Exception {
+ public static void checkMetadataEntry(RaftServer.Division server) throws
Exception {
final RaftLog log = server.getRaftLog();
final long lastIndex = log.getLastEntryTermIndex().getIndex();
final LogEntryProto e = log.get(lastIndex);
Assertions.assertTrue(e.hasMetadataEntry());
+ Assertions.assertEquals(log.getLastCommittedIndex() - 1,
e.getMetadataEntry().getCommitIndex());
+ }
- JavaUtils.attemptRepeatedly(() -> {
- Assertions.assertEquals(log.getLastCommittedIndex() - 1,
e.getMetadataEntry().getCommitIndex());
- return null;
- }, 50, BaseTest.HUNDRED_MILLIS, "CheckMetadataEntry", LOG);
+ public static void assertLogContent(RaftServer.Division server, boolean
isLeader) throws Exception {
+ JavaUtils.attempt(() -> checkMetadataEntry(server), 50, HUNDRED_MILLIS,
"checkMetadataEntry", LOG);
SimpleStateMachine4Testing simpleStateMachine =
SimpleStateMachine4Testing.get(server);
if (isLeader) {