This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 2664ac8e5 RATIS-2244. Reduce the number of log messages during
bootstrap (#1217)
2664ac8e5 is described below
commit 2664ac8e50bf52202b581555134cbdf33c057603
Author: venkatsambath <[email protected]>
AuthorDate: Sun Feb 2 23:13:34 2025 -0500
RATIS-2244. Reduce the number of log messages during bootstrap (#1217)
---
.../org/apache/ratis/client/impl/OrderedAsync.java | 2 +-
.../main/java/org/apache/ratis/util/BatchLogger.java | 14 +++++++-------
.../apache/ratis/grpc/server/GrpcLogAppender.java | 10 ++++++----
.../ratis/grpc/server/GrpcServerProtocolService.java | 20 ++++++++++++++++++--
.../server/impl/SnapshotInstallationHandler.java | 20 ++++++++++++--------
5 files changed, 44 insertions(+), 22 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
index 09c6cd4ac..1e21b171b 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
@@ -213,7 +213,7 @@ public final class OrderedAsync {
final Throwable exception = e;
final String key = client.getId() + "-" + request.getCallId() + "-" +
exception;
final Consumer<String> op = suffix -> LOG.error("{} {}: Failed* {}",
suffix, client.getId(), request, exception);
- BatchLogger.warn(BatchLogKey.SEND_REQUEST_EXCEPTION, key, op);
+ BatchLogger.print(BatchLogKey.SEND_REQUEST_EXCEPTION, key, op);
handleException(pending, request, e);
return null;
});
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/BatchLogger.java
b/ratis-common/src/main/java/org/apache/ratis/util/BatchLogger.java
index 38dad5c49..b57bed704 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/BatchLogger.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/BatchLogger.java
@@ -45,9 +45,9 @@ public final class BatchLogger {
private static final class UniqueId {
private final Key key;
- private final String name;
+ private final Object name;
- private UniqueId(Key key, String name) {
+ private UniqueId(Key key, Object name) {
this.key = Objects.requireNonNull(key, "key == null");
this.name = name;
}
@@ -99,15 +99,15 @@ public final class BatchLogger {
private static final TimeoutExecutor SCHEDULER =
TimeoutExecutor.getInstance();
private static final ConcurrentMap<UniqueId, BatchedLogEntry> LOG_CACHE =
new ConcurrentHashMap<>();
- public static void warn(Key key, String name, Consumer<String> op) {
- warn(key, name, op, key.getBatchDuration(), true);
+ public static void print(Key key, Object name, Consumer<String> op) {
+ print(key, name, op, key.getBatchDuration(), true);
}
- public static void warn(Key key, String name, Consumer<String> op,
TimeDuration batchDuration) {
- warn(key, name, op, batchDuration, true);
+ public static void print(Key key, Object name, Consumer<String> op,
TimeDuration batchDuration) {
+ print(key, name, op, batchDuration, true);
}
- public static void warn(Key key, String name, Consumer<String> op,
TimeDuration batchDuration, boolean shouldBatch) {
+ public static void print(Key key, Object name, Consumer<String> op,
TimeDuration batchDuration, boolean shouldBatch) {
if (!shouldBatch || batchDuration.isNonPositive()) {
op.accept("");
return;
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 1544975a4..89e0b4634 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -66,6 +66,7 @@ public class GrpcLogAppender extends LogAppenderBase {
private enum BatchLogKey implements BatchLogger.Key {
RESET_CLIENT,
+ INCONSISTENCY_REPLY,
APPEND_LOG_RESPONSE_HANDLER_ON_ERROR
}
@@ -217,7 +218,7 @@ public class GrpcLogAppender extends LogAppenderBase {
.orElseGet(f::getMatchIndex);
if (event.isError() && request == null) {
final long followerNextIndex = f.getNextIndex();
- BatchLogger.warn(BatchLogKey.RESET_CLIENT, f.getId() + "-" +
followerNextIndex, suffix ->
+ BatchLogger.print(BatchLogKey.RESET_CLIENT, f.getId() + "-" +
followerNextIndex, suffix ->
LOG.warn("{}: Follower failed (request=null, errorCount={}); keep
nextIndex ({}) unchanged and retry.{}",
this, errorCount, followerNextIndex, suffix),
logMessageBatchDuration);
return;
@@ -534,8 +535,9 @@ public class GrpcLogAppender extends LogAppenderBase {
break;
case INCONSISTENCY:
grpcServerMetrics.onRequestInconsistency(getFollowerId().toString());
- LOG.warn("{}: received {} reply with nextIndex {}, errorCount={},
request={}",
- this, reply.getResult(), reply.getNextIndex(), errorCount,
request);
+ BatchLogger.print(BatchLogKey.INCONSISTENCY_REPLY,
getFollower().getName() + "_" + reply.getNextIndex(),
+ suffix -> LOG.warn("{}: received {} reply with nextIndex {},
errorCount={}, request={} {}",
+ this, reply.getResult(), reply.getNextIndex(), errorCount,
request, suffix));
final long requestFirstIndex = request != null?
request.getFirstIndex(): RaftLog.INVALID_LOG_INDEX;
updateNextIndex(getNextIndexForInconsistency(requestFirstIndex,
reply.getNextIndex()));
break;
@@ -555,7 +557,7 @@ public class GrpcLogAppender extends LogAppenderBase {
LOG.info("{} is already stopped", GrpcLogAppender.this);
return;
}
- BatchLogger.warn(BatchLogKey.APPEND_LOG_RESPONSE_HANDLER_ON_ERROR,
AppendLogResponseHandler.this.name,
+ BatchLogger.print(BatchLogKey.APPEND_LOG_RESPONSE_HANDLER_ON_ERROR,
AppendLogResponseHandler.this.name,
suffix -> GrpcUtil.warn(LOG, () -> this + ": Failed appendEntries" +
suffix, t),
logMessageBatchDuration, t instanceof StatusRuntimeException);
grpcServerMetrics.onRequestRetry(); // Update try counter
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
index 451d74c64..7e17cb3cf 100644
---
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
+++
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
@@ -31,6 +31,8 @@ import org.apache.ratis.thirdparty.io.grpc.Status;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.apache.ratis.proto.RaftProtos.*;
import
org.apache.ratis.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceImplBase;
+import org.apache.ratis.util.BatchLogger;
+import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.ProtoUtils;
import org.apache.ratis.util.ReferenceCountedObject;
import org.slf4j.Logger;
@@ -49,6 +51,11 @@ import static
org.apache.ratis.proto.grpc.RaftServerProtocolServiceGrpc.getAppen
class GrpcServerProtocolService extends RaftServerProtocolServiceImplBase {
public static final Logger LOG =
LoggerFactory.getLogger(GrpcServerProtocolService.class);
+ private enum BatchLogKey implements BatchLogger.Key {
+ COMPLETED_REQUEST,
+ COMPLETED_REPLY
+ }
+
static class PendingServerRequest<REQUEST> {
private final AtomicReference<ReferenceCountedObject<REQUEST>> requestRef;
private final CompletableFuture<Void> future = new CompletableFuture<>();
@@ -76,6 +83,7 @@ class GrpcServerProtocolService extends
RaftServerProtocolServiceImplBase {
abstract class ServerRequestStreamObserver<REQUEST, REPLY> implements
StreamObserver<REQUEST> {
private final RaftServer.Op op;
+ private final Supplier<String> nameSupplier;
private final StreamObserver<REPLY> responseObserver;
/** For ordered {@link #onNext(Object)} requests. */
private final AtomicReference<PendingServerRequest<REQUEST>>
previousOnNext = new AtomicReference<>();
@@ -86,9 +94,14 @@ class GrpcServerProtocolService extends
RaftServerProtocolServiceImplBase {
ServerRequestStreamObserver(RaftServer.Op op, StreamObserver<REPLY>
responseObserver) {
this.op = op;
+ this.nameSupplier = MemoizedSupplier.valueOf(() -> getId() + "_" + op);
this.responseObserver = responseObserver;
}
+ String getName() {
+ return nameSupplier.get();
+ }
+
private String getPreviousRequestString() {
return Optional.ofNullable(previousOnNext.get())
.map(PendingServerRequest::getRequest)
@@ -197,9 +210,12 @@ class GrpcServerProtocolService extends
RaftServerProtocolServiceImplBase {
@Override
public void onCompleted() {
if (isClosed.compareAndSet(false, true)) {
- LOG.info("{}: Completed {}, lastRequest: {}", getId(), op,
getPreviousRequestString());
+ BatchLogger.print(BatchLogKey.COMPLETED_REQUEST, getName(),
+ suffix -> LOG.info("{}: Completed {}, lastRequest: {} {}",
+ getId(), op, getPreviousRequestString(), suffix));
requestFuture.get().thenAccept(reply -> {
- LOG.info("{}: Completed {}, lastReply: {}", getId(), op, reply);
+ BatchLogger.print(BatchLogKey.COMPLETED_REPLY, getName(),
+ suffix -> LOG.info("{}: Completed {}, lastReply: {} {}",
getId(), op, reply, suffix));
responseObserver.onCompleted();
});
releaseLast();
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
index 70027e6dd..4f1ac4177 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
@@ -36,6 +36,7 @@ import org.apache.ratis.server.protocol.RaftServerProtocol;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.util.ServerStringUtils;
+import org.apache.ratis.util.BatchLogger;
import org.apache.ratis.util.CodeInjectionForTesting;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.Preconditions;
@@ -59,6 +60,11 @@ import static
org.apache.ratis.server.raftlog.RaftLog.INVALID_LOG_INDEX;
class SnapshotInstallationHandler {
static final Logger LOG =
LoggerFactory.getLogger(SnapshotInstallationHandler.class);
+ private enum BatchLogKey implements BatchLogger.Key {
+ INSTALL_SNAPSHOT_REQUEST,
+ INSTALL_SNAPSHOT_REPLY
+ }
+
static final TermIndex INVALID_TERM_INDEX = TermIndex.valueOf(0,
INVALID_LOG_INDEX);
private final RaftServerImpl server;
@@ -93,10 +99,9 @@ class SnapshotInstallationHandler {
}
InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto
request) throws IOException {
- if (LOG.isInfoEnabled()) {
- LOG.info("{}: receive installSnapshot: {}", getMemberId(),
- ServerStringUtils.toInstallSnapshotRequestString(request));
- }
+ BatchLogger.print(BatchLogKey.INSTALL_SNAPSHOT_REQUEST, getMemberId(),
+ suffix -> LOG.info("{}: receive installSnapshot: {} {}",
+ getMemberId(),
ServerStringUtils.toInstallSnapshotRequestString(request), suffix));
final InstallSnapshotReplyProto reply;
try {
reply = installSnapshotImpl(request);
@@ -104,10 +109,9 @@ class SnapshotInstallationHandler {
LOG.error("{}: installSnapshot failed", getMemberId(), e);
throw e;
}
- if (LOG.isInfoEnabled()) {
- LOG.info("{}: reply installSnapshot: {}", getMemberId(),
- ServerStringUtils.toInstallSnapshotReplyString(reply));
- }
+ BatchLogger.print(BatchLogKey.INSTALL_SNAPSHOT_REPLY, getMemberId(),
+ suffix -> LOG.info("{}: reply installSnapshot: {} {}",
+ getMemberId(),
ServerStringUtils.toInstallSnapshotReplyString(reply), suffix));
return reply;
}