taklwu commented on code in PR #1443:
URL: https://github.com/apache/ratis/pull/1443#discussion_r3191158350
##########
ratis-common/src/main/java/org/apache/ratis/trace/TraceServer.java:
##########
@@ -56,4 +61,34 @@ private static Span
createServerSpanFromClientRequest(RaftClientRequest request,
span.setAttribute(RatisAttributes.MEMBER_ID, memberId);
return span;
}
+
+ /**
+ * Traces follower handling of {@link AppendEntriesRequestProto} when the
leader attached trace
+ * context (client-originated) for replication.
+ */
+ public static <T> CompletableFuture<T> traceAppendEntriesAsync(
+ CheckedSupplier<CompletableFuture<T>, IOException> action,
+ AppendEntriesRequestProto request, String memberId) throws IOException {
+ if (!TraceUtils.isEnabled()) {
+ return action.get();
+ }
+ final RaftRpcRequestProto rpc = request.getServerRequest();
+ final SpanContextProto spanContext = rpc.getSpanContext();
+ // If the leader sent no parent span context, still trace as a root span
+ // rather than skipping tracing entirely.
+ final Context remoteContext = (spanContext == null ||
spanContext.getContextMap().isEmpty())
+ ? Context.root()
+ : TraceUtils.extractContextFromProto(spanContext);
+ return TraceUtils.traceAsyncMethod(action, () -> {
+ final Span span = TraceUtils.getGlobalTracer()
+ .spanBuilder("raft.server.appendEntriesAsync")
Review Comment:
I made a new class SpanNames and add this into it, also it comes with other
refactoring
##########
ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java:
##########
@@ -558,7 +568,28 @@ PendingRequest addPendingRequest(PendingRequests.Permit
permit, RaftClientReques
LOG.debug("{}: addPendingRequest at {}, entry={}", this, request,
LogProtoUtils.toLogEntryString(entry.getLogEntry()));
}
- return pendingRequests.add(permit, request, entry);
+ final PendingRequest pending = pendingRequests.add(permit, request, entry);
+ if (pending != null && TraceUtils.isEnabled()) {
+ final SpanContextProto spanContext = request.getSpanContext();
+ if (spanContext != null && !spanContext.getContextMap().isEmpty()) {
+ replicationTraceByLogIndex.put(pending.getTermIndex().getIndex(),
spanContext);
+ }
+ }
+ return pending;
+ }
+
+ private static SpanContextProto
tracingContextForReplication(List<LogEntryProto> entries,
+ ConcurrentHashMap<Long, SpanContextProto> traceByIndex) {
+ if (entries == null || entries.isEmpty()) {
+ return null;
+ }
+ for (LogEntryProto e : entries) {
+ final SpanContextProto sc = traceByIndex.get(e.getIndex());
+ if (sc != null && !sc.getContextMap().isEmpty()) {
+ return sc;
+ }
+ }
Review Comment:
AFAIK that AppendEntriesRequestProto is a single RPC with a batch of append
entries, so, we should only have a single parent spancontext for this batch.
##########
ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java:
##########
@@ -733,6 +765,7 @@ void submitStepDownEvent(long term, StepDownReason reason) {
}
private void stepDown(long term, StepDownReason reason) {
+ replicationTraceByLogIndex.clear();
Review Comment:
so does that mean we only add trace for the actual leader ?
##########
ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java:
##########
@@ -558,7 +568,28 @@ PendingRequest addPendingRequest(PendingRequests.Permit
permit, RaftClientReques
LOG.debug("{}: addPendingRequest at {}, entry={}", this, request,
LogProtoUtils.toLogEntryString(entry.getLogEntry()));
}
- return pendingRequests.add(permit, request, entry);
+ final PendingRequest pending = pendingRequests.add(permit, request, entry);
+ if (pending != null && TraceUtils.isEnabled()) {
+ final SpanContextProto spanContext = request.getSpanContext();
+ if (spanContext != null && !spanContext.getContextMap().isEmpty()) {
+ replicationTraceByLogIndex.put(pending.getTermIndex().getIndex(),
spanContext);
+ }
+ }
+ return pending;
+ }
+
+ private static SpanContextProto
tracingContextForReplication(List<LogEntryProto> entries,
+ ConcurrentHashMap<Long, SpanContextProto> traceByIndex) {
+ if (entries == null || entries.isEmpty()) {
+ return null;
+ }
+ for (LogEntryProto e : entries) {
+ final SpanContextProto sc = traceByIndex.get(e.getIndex());
Review Comment:
ack, I followed your change.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]