This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new d132b18c4 RATIS-2393 Add Span Context to RaftRpcRequestProto (#1341)
d132b18c4 is described below

commit d132b18c4c34530e113ce1e5f21b5928db60bb80
Author: Tak Lon (Stephen) Wu <[email protected]>
AuthorDate: Thu Mar 12 13:47:50 2026 +0800

    RATIS-2393 Add Span Context to RaftRpcRequestProto (#1341)
---
 pom.xml                                            |  31 ++++
 .../apache/ratis/client/impl/ClientProtoUtils.java |   4 +
 ratis-common/pom.xml                               |  21 +++
 .../apache/ratis/protocol/RaftClientRequest.java   |  14 ++
 .../org/apache/ratis/trace/RatisAttributes.java    |  35 ++++
 .../org/apache/ratis/trace/TraceConfigKeys.java    |  45 +++++
 .../java/org/apache/ratis/trace/TraceUtils.java    | 187 +++++++++++++++++++++
 .../java/org/apache/ratis/util/VersionInfo.java    |   8 +
 ratis-proto/src/main/proto/Raft.proto              |   6 +
 .../apache/ratis/server/impl/RaftServerImpl.java   |  13 ++
 .../server/impl/RaftServerImplTracingTests.java    | 141 ++++++++++++++++
 11 files changed, 505 insertions(+)

diff --git a/pom.xml b/pom.xml
index 240f558b3..5c098c311 100644
--- a/pom.xml
+++ b/pom.xml
@@ -189,6 +189,9 @@
     <shaded.protobuf.version>3.25.8</shaded.protobuf.version>
     <shaded.grpc.version>1.77.1</shaded.grpc.version>
 
+    <!-- OpenTelemetry versions -->
+    <opentelemetry.version>1.59.0</opentelemetry.version>
+    <opentelemetry-semconv.version>1.40.0</opentelemetry-semconv.version>
     <!-- Test properties -->
     
<maven.test.redirectTestOutputToFile>true</maven.test.redirectTestOutputToFile>
     <test.exclude.pattern>_</test.exclude.pattern>
@@ -398,7 +401,35 @@
         <artifactId>jakarta.annotation-api</artifactId>
         <version>${jakarta.annotation.version}</version>
       </dependency>
+
+      <!-- OpenTelemetry dependencies -->
+      <dependency>
+        <groupId>io.opentelemetry</groupId>
+        <artifactId>opentelemetry-api</artifactId>
+        <version>${opentelemetry.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>io.opentelemetry</groupId>
+        <artifactId>opentelemetry-sdk</artifactId>
+        <version>${opentelemetry.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>io.opentelemetry</groupId>
+        <artifactId>opentelemetry-sdk-testing</artifactId>
+        <version>${opentelemetry.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>io.opentelemetry.semconv</groupId>
+        <artifactId>opentelemetry-semconv</artifactId>
+        <version>${opentelemetry-semconv.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>io.opentelemetry</groupId>
+        <artifactId>opentelemetry-context</artifactId>
+        <version>${opentelemetry.version}</version>
+      </dependency>
     </dependencies>
+
   </dependencyManagement>
 
   <build>
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 36c0b3937..d2146a521 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -128,6 +128,7 @@ public interface ClientProtoUtils {
 
     
Optional.ofNullable(request.getSlidingWindowEntry()).ifPresent(b::setSlidingWindowEntry);
     
Optional.ofNullable(request.getRoutingTable()).map(RoutingTable::toProto).ifPresent(b::setRoutingTable);
+    Optional.ofNullable(request.getSpanContext()).ifPresent(b::setSpanContext);
 
     return b.setCallId(request.getCallId())
         .setToLeader(request.isToLeader())
@@ -188,6 +189,9 @@ public interface ClientProtoUtils {
     if (request.hasSlidingWindowEntry()) {
       b.setSlidingWindowEntry(request.getSlidingWindowEntry());
     }
+    if (request.hasSpanContext()) {
+      b.setSpanContext(request.getSpanContext());
+    }
     return b.setClientId(ClientId.valueOf(request.getRequestorId()))
         .setGroupId(ProtoUtils.toRaftGroupId(request.getRaftGroupId()))
         .setCallId(request.getCallId())
diff --git a/ratis-common/pom.xml b/ratis-common/pom.xml
index f6bc0ee41..ba19c73e3 100644
--- a/ratis-common/pom.xml
+++ b/ratis-common/pom.xml
@@ -38,6 +38,27 @@
       <artifactId>slf4j-api</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>io.opentelemetry</groupId>
+      <artifactId>opentelemetry-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>io.opentelemetry</groupId>
+      <artifactId>opentelemetry-context</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>io.opentelemetry</groupId>
+      <artifactId>opentelemetry-sdk</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>io.opentelemetry</groupId>
+      <artifactId>opentelemetry-sdk-testing</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>io.opentelemetry.semconv</groupId>
+      <artifactId>opentelemetry-semconv</artifactId>
+    </dependency>
+
     <dependency>
       <groupId>org.junit.jupiter</groupId>
       <artifactId>junit-jupiter-api</artifactId>
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
index ed41f1ea2..b04402fe1 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
@@ -24,6 +24,7 @@ import 
org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
 import org.apache.ratis.proto.RaftProtos.ReadRequestTypeProto;
 import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
 import org.apache.ratis.proto.RaftProtos.SlidingWindowEntry;
+import org.apache.ratis.proto.RaftProtos.SpanContextProto;
 import org.apache.ratis.proto.RaftProtos.StaleReadRequestTypeProto;
 import org.apache.ratis.proto.RaftProtos.WatchRequestTypeProto;
 import org.apache.ratis.proto.RaftProtos.WriteRequestTypeProto;
@@ -305,6 +306,7 @@ public class RaftClientRequest extends RaftClientMessage {
     private SlidingWindowEntry slidingWindowEntry;
     private RoutingTable routingTable;
     private long timeoutMs;
+    private SpanContextProto spanContext;
 
     public RaftClientRequest build() {
       return new RaftClientRequest(this);
@@ -366,6 +368,11 @@ public class RaftClientRequest extends RaftClientMessage {
       this.timeoutMs = timeoutMs;
       return this;
     }
+
+    public Builder setSpanContext(SpanContextProto spanContext) {
+      this.spanContext = spanContext;
+      return this;
+    }
   }
 
   public static Builder newBuilder() {
@@ -397,6 +404,8 @@ public class RaftClientRequest extends RaftClientMessage {
 
   private final boolean toLeader;
 
+  private final SpanContextProto spanContext;
+
   /** Construct a request for sending to the given server. */
   protected RaftClientRequest(ClientId clientId, RaftPeerId serverId, 
RaftGroupId groupId, long callId, Type type) {
     this(newBuilder()
@@ -429,6 +438,7 @@ public class RaftClientRequest extends RaftClientMessage {
     this.slidingWindowEntry = b.slidingWindowEntry;
     this.routingTable = b.routingTable;
     this.timeoutMs = b.timeoutMs;
+    this.spanContext = b.spanContext;
   }
 
   @Override
@@ -472,6 +482,10 @@ public class RaftClientRequest extends RaftClientMessage {
     return timeoutMs;
   }
 
+  public SpanContextProto getSpanContext() {
+    return spanContext;
+  }
+
   @Override
   public String toString() {
     return super.toString() + ", seq=" + 
ProtoUtils.toString(slidingWindowEntry) + ", "
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/trace/RatisAttributes.java 
b/ratis-common/src/main/java/org/apache/ratis/trace/RatisAttributes.java
new file mode 100644
index 000000000..d74c63757
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/trace/RatisAttributes.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.trace;
+
+import io.opentelemetry.api.common.AttributeKey;
+
+/**
+ * The constants in this class correspond with the guidance outlined by the 
OpenTelemetry <a href=
+ * "https://github.com/open-telemetry/semantic-conventions";>Semantic
+ * Conventions</a>.
+ */
+public final class RatisAttributes {
+  public static final AttributeKey<String> CLIENT_ID = 
AttributeKey.stringKey("raft.client.id");
+  public static final AttributeKey<String> MEMBER_ID = 
AttributeKey.stringKey("raft.member.id");
+  public static final AttributeKey<String> CALL_ID = 
AttributeKey.stringKey("raft.call.id");
+
+
+  private RatisAttributes() {
+  }
+}
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/trace/TraceConfigKeys.java 
b/ratis-common/src/main/java/org/apache/ratis/trace/TraceConfigKeys.java
new file mode 100644
index 000000000..b0a1cbd9b
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/trace/TraceConfigKeys.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.trace;
+
+import org.apache.ratis.conf.RaftProperties;
+
+import java.util.function.Consumer;
+
+import static org.apache.ratis.conf.ConfUtils.getBoolean;
+import static org.apache.ratis.conf.ConfUtils.setBoolean;
+
+public interface TraceConfigKeys {
+  String PREFIX = "raft.otel.tracing";
+
+  String ENABLED_KEY = PREFIX + ".enabled";
+  boolean ENABLED_DEFAULT = false;
+
+  static boolean enabled(RaftProperties properties, Consumer<String> logger) {
+    return getBoolean(properties::getBoolean, ENABLED_KEY, ENABLED_DEFAULT, 
logger);
+  }
+
+  static boolean enabled(RaftProperties properties) {
+    return enabled(properties, null);
+  }
+
+  static void setEnabled(RaftProperties properties, boolean enabled) {
+    setBoolean(properties::setBoolean, ENABLED_KEY, enabled);
+  }
+}
+
diff --git a/ratis-common/src/main/java/org/apache/ratis/trace/TraceUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/trace/TraceUtils.java
new file mode 100644
index 000000000..bb10e2455
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/trace/TraceUtils.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.trace;
+
+import io.opentelemetry.api.GlobalOpenTelemetry;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.api.trace.StatusCode;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
+import io.opentelemetry.context.propagation.TextMapPropagator;
+import io.opentelemetry.context.propagation.TextMapGetter;
+import org.apache.ratis.proto.RaftProtos.SpanContextProto;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.function.CheckedSupplier;
+import org.apache.ratis.util.VersionInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import java.util.function.Supplier;
+
+public final class TraceUtils {
+
+  private static final Tracer TRACER = 
GlobalOpenTelemetry.getTracer("org.apache.ratis",
+      VersionInfo.getSoftwareInfoVersion());
+
+  private static final Logger LOG = LoggerFactory.getLogger(TraceUtils.class);
+
+  private TraceUtils() {
+  }
+
+  public static Tracer getGlobalTracer() {
+    return TRACER;
+  }
+
+  /**
+   * Trace an asynchronous operation represented by a {@link 
CompletableFuture}.
+   * The returned future will complete with the same result or error as the 
original future,
+   * but the provided {@code span} will be ended when the future completes.
+   */
+  static <T, THROWABLE extends Throwable> CompletableFuture<T> 
traceAsyncMethod(
+      CheckedSupplier<CompletableFuture<T>, THROWABLE> action, Supplier<Span> 
spanSupplier) throws THROWABLE {
+    final Span span = spanSupplier.get();
+    try (Scope ignored = span.makeCurrent()) {
+      final CompletableFuture<T> future;
+      try {
+        future = action.get();
+      } catch (RuntimeException | Error e) {
+        setError(span, e);
+        span.end();
+        throw e;
+      } catch (Throwable t) {
+        setError(span, t);
+        span.end();
+        throw JavaUtils.<THROWABLE>cast(t);
+      }
+      endSpan(future, span);
+      return future;
+    }
+  }
+
+  public static <T, THROWABLE extends Throwable> CompletableFuture<T> 
traceAsyncMethod(
+      CheckedSupplier<CompletableFuture<T>, THROWABLE> action,
+      RaftClientRequest request, String memberId, String spanName) throws 
THROWABLE {
+    return traceAsyncMethod(action, () -> 
createServerSpanFromClientRequest(request, memberId, spanName));
+  }
+
+  public static <T, THROWABLE extends Throwable> CompletableFuture<T> 
traceAsyncMethodIfEnabled(
+      boolean enabled,
+      CheckedSupplier<CompletableFuture<T>, THROWABLE> action,
+      RaftClientRequest request, String memberId, String spanName) throws 
THROWABLE {
+    return enabled ? traceAsyncMethod(action, request, memberId, spanName) : 
action.get();
+  }
+
+  private static Span createServerSpanFromClientRequest(RaftClientRequest 
request, String memberId, String spanName) {
+    final Context remoteContext = 
extractContextFromProto(request.getSpanContext());
+    final Span span = getGlobalTracer()
+        .spanBuilder(spanName)
+        .setParent(remoteContext)
+        .setSpanKind(SpanKind.SERVER)
+        .startSpan();
+    span.setAttribute(RatisAttributes.CLIENT_ID, 
String.valueOf(request.getClientId()));
+    span.setAttribute(RatisAttributes.CALL_ID, 
String.valueOf(request.getCallId()));
+    span.setAttribute(RatisAttributes.MEMBER_ID, memberId);
+    return span;
+  }
+
+  private static void endSpan(CompletableFuture<?> future, Span span) {
+    addListener(future, (resp, error) -> {
+      if (error != null) {
+        setError(span, error);
+      } else {
+        span.setStatus(StatusCode.OK);
+      }
+      span.end();
+    });
+  }
+
+  public static void setError(Span span, Throwable error) {
+    span.recordException(error);
+    span.setStatus(StatusCode.ERROR);
+  }
+
+  /**
+   * This is method is used when you just want to add a listener to the given 
future. We will call
+   * {@link CompletableFuture#whenComplete(BiConsumer)} to register the {@code 
action} to the
+   * {@code future}. Ignoring the return value of a Future is considered as a 
bad practice as it may
+   * suppress exceptions thrown from the code that completes the future, and 
this method will catch
+   * all the exception thrown from the {@code action} to catch possible code 
bugs.
+   * <p/>
+   * And the error phone check will always report FutureReturnValueIgnored 
because every method in
+   * the {@link CompletableFuture} class will return a new {@link 
CompletableFuture}, so you always
+   * have one future that has not been checked. So we introduce this method 
and add a suppression
+   * warnings annotation here.
+   */
+  @SuppressWarnings("FutureReturnValueIgnored")
+  private static <T> void addListener(CompletableFuture<T> future,
+      BiConsumer<? super T, ? super Throwable> action) {
+    future.whenComplete((resp, error) -> {
+      try {
+        // See this post on stack overflow(shorten since the url is too long),
+        // https://s.apache.org/completionexception
+        // For a chain of CompletableFuture, only the first child 
CompletableFuture can get the
+        // original exception, others will get a CompletionException, which 
wraps the original
+        // exception. So here we unwrap it before passing it to the callback 
action.
+        action.accept(resp, JavaUtils.unwrapCompletionException(error));
+      } catch (Throwable t) {
+        LOG.error("Unexpected error caught when processing CompletableFuture", 
t);
+      }
+    });
+  }
+
+  private static final TextMapPropagator PROPAGATOR =
+      GlobalOpenTelemetry.getPropagators().getTextMapPropagator();
+
+  public static SpanContextProto injectContextToProto(Context context) {
+    Map<String, String> carrier = new TreeMap<>();
+    PROPAGATOR.inject(context, carrier, (map, key, value) -> map.put(key, 
value));
+    return SpanContextProto.newBuilder().putAllContext(carrier).build();
+  }
+
+  public static Context extractContextFromProto(SpanContextProto proto) {
+    if (proto == null || proto.getContextMap().isEmpty()) {
+      return Context.current();
+    }
+    final TextMapGetter<SpanContextProto> getter = SpanContextGetter.INSTANCE;
+    return PROPAGATOR.extract(Context.current(), proto, getter);
+  }
+}
+
+class SpanContextGetter implements TextMapGetter<SpanContextProto> {
+  static final SpanContextGetter INSTANCE = new SpanContextGetter();
+
+  @Override
+  public Iterable<String> keys(SpanContextProto carrier) {
+    return carrier.getContextMap().keySet();
+  }
+
+  @Override
+  public String get(SpanContextProto carrier, String key) {
+    return Optional.ofNullable(carrier).map(SpanContextProto::getContextMap)
+        .map(map -> map.get(key)).orElse(null);
+  }
+
+}
\ No newline at end of file
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/VersionInfo.java 
b/ratis-common/src/main/java/org/apache/ratis/util/VersionInfo.java
index 07136e9a4..4f24879f8 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/VersionInfo.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/VersionInfo.java
@@ -158,6 +158,14 @@ public final class VersionInfo {
     sortedMap.forEach(out);
   }
 
+  /**
+   * Get the current ratis version.
+   * @return the current ratis version string.
+   */
+  public static String getSoftwareInfoVersion() {
+    return 
VersionInfo.load(VersionInfo.class).softwareInfos.getOrDefault(SoftwareInfo.VERSION);
+  }
+
   public static void main(String[] args) {
     printSystemProperties((key, value) -> System.out.printf("%-40s = %s%n", 
key, value));
 
diff --git a/ratis-proto/src/main/proto/Raft.proto 
b/ratis-proto/src/main/proto/Raft.proto
index 6dbfdb15a..eba5de3b7 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -117,6 +117,7 @@ message RaftRpcRequestProto {
   uint64 callId = 4;
   bool toLeader = 5;
 
+  SpanContextProto spanContext = 11;
   repeated uint64 repliedCallIds = 12; // The call ids of the replied requests
   uint64 timeoutMs = 13;
   RoutingTableProto routingTable = 14;
@@ -569,3 +570,8 @@ message LogInfoProto {
   TermIndexProto committed = 3;
   TermIndexProto lastEntry = 4;
 }
+
+// The attribute map for opentelemetry trace
+message SpanContextProto {
+  map<string, string> context = 1;
+}
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index d9dd09d96..0b891f658 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -100,6 +100,8 @@ import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.statemachine.impl.TransactionContextImpl;
 import 
org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.ratis.trace.TraceUtils;
+import org.apache.ratis.trace.TraceConfigKeys;
 import org.apache.ratis.util.CodeInjectionForTesting;
 import org.apache.ratis.util.CollectionUtils;
 import org.apache.ratis.util.ConcurrentUtils;
@@ -256,6 +258,8 @@ class RaftServerImpl implements RaftServer.Division,
   private final ExecutorService serverExecutor;
   private final ExecutorService clientExecutor;
 
+  private final boolean tracingEnabled;
+
   private final AtomicBoolean firstElectionSinceStartup = new 
AtomicBoolean(true);
   private final ThreadGroup threadGroup;
 
@@ -282,6 +286,7 @@ class RaftServerImpl implements RaftServer.Division,
     this.readOption = RaftServerConfigKeys.Read.option(properties);
     this.writeIndexCache = new WriteIndexCache(properties);
     this.transactionManager = new TransactionManager(id);
+    this.tracingEnabled = TraceConfigKeys.enabled(properties);
 
     this.leaderElectionMetrics = 
LeaderElectionMetrics.getLeaderElectionMetrics(
         getMemberId(), state::getLastLeaderElapsedTimeMs);
@@ -943,6 +948,14 @@ class RaftServerImpl implements RaftServer.Division,
   @Override
   public CompletableFuture<RaftClientReply> submitClientRequestAsync(
       RaftClientRequest request) throws IOException {
+    return TraceUtils.traceAsyncMethodIfEnabled(
+        tracingEnabled,
+        () -> submitClientRequestAsyncInternal(request),
+        request, getMemberId().toString(), 
"raft.server.submitClientRequestAsync");
+  }
+
+  private CompletableFuture<RaftClientReply> submitClientRequestAsyncInternal(
+      RaftClientRequest request) throws IOException {
     assertLifeCycleState(LifeCycle.States.RUNNING);
     LOG.debug("{}: receive client request({})", getMemberId(), request);
     final Timekeeper timer = 
raftServerMetrics.getClientRequestTimer(request.getType());
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerImplTracingTests.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerImplTracingTests.java
new file mode 100644
index 000000000..300cf51cd
--- /dev/null
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerImplTracingTests.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.sdk.testing.junit5.OpenTelemetryExtension;
+import io.opentelemetry.sdk.trace.data.SpanData;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.exceptions.ServerNotReadyException;
+import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.statemachine.impl.SimpleStateMachine4Testing;
+import org.apache.ratis.trace.TraceConfigKeys;
+import org.apache.ratis.trace.TraceUtils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.mock;
+
+public class RaftServerImplTracingTests {
+
+  @RegisterExtension
+  private static final OpenTelemetryExtension openTelemetryExtension =
+      OpenTelemetryExtension.create();
+
+  @Test
+  public void testSubmitClientRequestAsync() throws Exception {
+    final List<SpanData> spans = submitClientRequestAndCollectNewSpans(true);
+    assertEquals(2, spans.size());
+    assertTrue(
+        spans.stream().anyMatch(s -> s.getKind() == SpanKind.CLIENT && 
s.getName().equals("client-span")),
+        "Expected at least one span with SpanKind.CLIENT"
+    );
+    assertTrue(
+        spans.stream().anyMatch(s -> s.getKind() == SpanKind.SERVER
+            && s.getName().equals("raft.server.submitClientRequestAsync")),
+        "Expected at least one span with SpanKind.SERVER"
+    );
+
+  }
+
+  @Test
+  public void testSubmitClientRequestAsyncTracingDisabled() throws Exception {
+    final List<SpanData> spans = submitClientRequestAndCollectNewSpans(false);
+    // Even when server-side tracing is disabled, we still emit the client 
span used to
+    // generate the propagated context.
+    assertEquals(1, spans.size());
+    assertTrue(
+        spans.stream().noneMatch(s -> s.getKind() == SpanKind.SERVER
+            && s.getName().equals("raft.server.submitClientRequestAsync")),
+        "Expected no SERVER span when tracing is disabled"
+    );
+    assertTrue(
+        spans.stream().anyMatch(s -> s.getKind() == SpanKind.CLIENT && 
s.getName().equals("client-span")),
+        "Expected at least one span with SpanKind.CLIENT"
+    );
+  }
+
+  private static List<SpanData> submitClientRequestAndCollectNewSpans(boolean 
enableTracing)
+      throws Exception {
+    final int before = openTelemetryExtension.getSpans().size();
+
+    final RaftServerImpl server = newRaftServerImpl(enableTracing);
+    try {
+      final RaftClientRequest request = 
newRaftClientRequest(RaftClientRequest.writeRequestType());
+
+      try {
+        server.submitClientRequestAsync(request);
+      } catch (ServerNotReadyException ignored) {
+        // server is not running; only verifying span emission
+      }
+    } finally {
+      server.close();
+    }
+
+    final List<SpanData> after = openTelemetryExtension.getSpans();
+    return new ArrayList<>(after.subList(before, after.size()));
+  }
+
+  private static RaftServerImpl newRaftServerImpl(boolean enableTracing) 
throws Exception {
+    final RaftGroup group = RaftGroup.emptyGroup();
+    final StateMachine sm = new SimpleStateMachine4Testing();
+    final RaftServerProxy proxy = mock(RaftServerProxy.class);
+    when(proxy.getId()).thenReturn(RaftPeerId.valueOf("peer1"));
+    final RaftProperties properties = new RaftProperties();
+    TraceConfigKeys.setEnabled(properties, enableTracing);
+    when(proxy.getProperties()).thenReturn(properties);
+    when(proxy.getThreadGroup()).thenReturn(new ThreadGroup("test"));
+    return new RaftServerImpl(group, sm, proxy, 
RaftStorage.StartupOption.FORMAT);
+  }
+
+  private static RaftClientRequest newRaftClientRequest(RaftClientRequest.Type 
type) {
+    final Span clientSpan =
+        openTelemetryExtension.getOpenTelemetry().getTracer("test")
+            .spanBuilder("client-span")
+            .setSpanKind(SpanKind.CLIENT)
+            .startSpan();
+    try {
+      final Context clientContext = Context.current().with(clientSpan);
+      return RaftClientRequest.newBuilder()
+          .setClientId(ClientId.randomId())
+          .setServerId(RaftPeerId.valueOf("s0"))
+          .setGroupId(RaftGroupId.randomId())
+          .setCallId(1L)
+          .setType(type)
+          .setSpanContext(TraceUtils.injectContextToProto(clientContext))
+          .build();
+    } finally {
+      clientSpan.end();
+    }
+  }
+}
+

Reply via email to