szetszwo commented on code in PR #1443:
URL: https://github.com/apache/ratis/pull/1443#discussion_r3190584487


##########
ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java:
##########
@@ -558,7 +568,28 @@ PendingRequest addPendingRequest(PendingRequests.Permit 
permit, RaftClientReques
       LOG.debug("{}: addPendingRequest at {}, entry={}", this, request,
           LogProtoUtils.toLogEntryString(entry.getLogEntry()));
     }
-    return pendingRequests.add(permit, request, entry);
+    final PendingRequest pending = pendingRequests.add(permit, request, entry);
+    if (pending != null && TraceUtils.isEnabled()) {
+      final SpanContextProto spanContext = request.getSpanContext();
+      if (spanContext != null && !spanContext.getContextMap().isEmpty()) {
+        replicationTraceByLogIndex.put(pending.getTermIndex().getIndex(), 
spanContext);
+      }
+    }
+    return pending;
+  }
+
+  private static SpanContextProto 
tracingContextForReplication(List<LogEntryProto> entries,
+      ConcurrentHashMap<Long, SpanContextProto> traceByIndex) {
+    if (entries == null || entries.isEmpty()) {
+      return null;
+    }
+    for (LogEntryProto e : entries) {
+      final SpanContextProto sc = traceByIndex.get(e.getIndex());
+      if (sc != null && !sc.getContextMap().isEmpty()) {
+        return sc;
+      }
+    }

Review Comment:
   Since entries is a list, there could be multiple PendingRequest s.  Should 
we return multiple SpanContextProto?



##########
ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java:
##########
@@ -733,6 +765,7 @@ void submitStepDownEvent(long term, StepDownReason reason) {
   }
 
   private void stepDown(long term, StepDownReason reason) {
+    replicationTraceByLogIndex.clear();

Review Comment:
   This is not needed since stop() will be callled.



##########
ratis-common/src/main/java/org/apache/ratis/trace/TraceServer.java:
##########
@@ -56,4 +61,34 @@ private static Span 
createServerSpanFromClientRequest(RaftClientRequest request,
     span.setAttribute(RatisAttributes.MEMBER_ID, memberId);
     return span;
   }
+
+  /**
+   * Traces follower handling of {@link AppendEntriesRequestProto} when the 
leader attached trace
+   * context (client-originated) for replication.
+   */
+  public static <T> CompletableFuture<T> traceAppendEntriesAsync(
+      CheckedSupplier<CompletableFuture<T>, IOException> action,
+      AppendEntriesRequestProto request, String memberId) throws IOException {
+    if (!TraceUtils.isEnabled()) {
+      return action.get();
+    }
+    final RaftRpcRequestProto rpc = request.getServerRequest();
+    final SpanContextProto spanContext = rpc.getSpanContext();
+    // If the leader sent no parent span context, still trace as a root span
+    // rather than skipping tracing entirely.
+    final Context remoteContext = (spanContext == null || 
spanContext.getContextMap().isEmpty())
+        ? Context.root()
+        : TraceUtils.extractContextFromProto(spanContext);
+    return TraceUtils.traceAsyncMethod(action, () -> {
+      final Span span = TraceUtils.getGlobalTracer()
+          .spanBuilder("raft.server.appendEntriesAsync")

Review Comment:
   Let's add a static constant for "raft.server.appendEntriesAsync".



##########
ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java:
##########
@@ -354,6 +356,13 @@ boolean isApplied() {
   private final long followerMaxGapThreshold;
   private final PendingStepDown pendingStepDown;
 
+  /**
+   * Client-originated trace context keyed by log index; attached to {@link 
AppendEntriesRequestProto}
+   * so follower {@code appendEntries} spans join the same trace as the client 
write.
+   */
+  private final ConcurrentHashMap<Long, SpanContextProto> 
replicationTraceByLogIndex =
+      new ConcurrentHashMap<>();

Review Comment:
   Let's create a new class, say LeaderTracer, and move all the tracing code 
there.



##########
ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java:
##########
@@ -558,7 +568,28 @@ PendingRequest addPendingRequest(PendingRequests.Permit 
permit, RaftClientReques
       LOG.debug("{}: addPendingRequest at {}, entry={}", this, request,
           LogProtoUtils.toLogEntryString(entry.getLogEntry()));
     }
-    return pendingRequests.add(permit, request, entry);
+    final PendingRequest pending = pendingRequests.add(permit, request, entry);
+    if (pending != null && TraceUtils.isEnabled()) {
+      final SpanContextProto spanContext = request.getSpanContext();
+      if (spanContext != null && !spanContext.getContextMap().isEmpty()) {
+        replicationTraceByLogIndex.put(pending.getTermIndex().getIndex(), 
spanContext);
+      }
+    }
+    return pending;
+  }
+
+  private static SpanContextProto 
tracingContextForReplication(List<LogEntryProto> entries,
+      ConcurrentHashMap<Long, SpanContextProto> traceByIndex) {
+    if (entries == null || entries.isEmpty()) {
+      return null;
+    }
+    for (LogEntryProto e : entries) {
+      final SpanContextProto sc = traceByIndex.get(e.getIndex());

Review Comment:
   Since it needs to loop for entries in consecutive indices, it is more 
efficient to use a NavigableMap.  I suggest to use TreeMap with a read-write 
lock.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to