This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new d132b18c4 RATIS-2393 Add Span Context to RaftRpcRequestProto (#1341)
d132b18c4 is described below
commit d132b18c4c34530e113ce1e5f21b5928db60bb80
Author: Tak Lon (Stephen) Wu <[email protected]>
AuthorDate: Thu Mar 12 13:47:50 2026 +0800
RATIS-2393 Add Span Context to RaftRpcRequestProto (#1341)
---
pom.xml | 31 ++++
.../apache/ratis/client/impl/ClientProtoUtils.java | 4 +
ratis-common/pom.xml | 21 +++
.../apache/ratis/protocol/RaftClientRequest.java | 14 ++
.../org/apache/ratis/trace/RatisAttributes.java | 35 ++++
.../org/apache/ratis/trace/TraceConfigKeys.java | 45 +++++
.../java/org/apache/ratis/trace/TraceUtils.java | 187 +++++++++++++++++++++
.../java/org/apache/ratis/util/VersionInfo.java | 8 +
ratis-proto/src/main/proto/Raft.proto | 6 +
.../apache/ratis/server/impl/RaftServerImpl.java | 13 ++
.../server/impl/RaftServerImplTracingTests.java | 141 ++++++++++++++++
11 files changed, 505 insertions(+)
diff --git a/pom.xml b/pom.xml
index 240f558b3..5c098c311 100644
--- a/pom.xml
+++ b/pom.xml
@@ -189,6 +189,9 @@
<shaded.protobuf.version>3.25.8</shaded.protobuf.version>
<shaded.grpc.version>1.77.1</shaded.grpc.version>
+ <!-- OpenTelemetry versions -->
+ <opentelemetry.version>1.59.0</opentelemetry.version>
+ <opentelemetry-semconv.version>1.40.0</opentelemetry-semconv.version>
<!-- Test properties -->
<maven.test.redirectTestOutputToFile>true</maven.test.redirectTestOutputToFile>
<test.exclude.pattern>_</test.exclude.pattern>
@@ -398,7 +401,35 @@
<artifactId>jakarta.annotation-api</artifactId>
<version>${jakarta.annotation.version}</version>
</dependency>
+
+ <!-- OpenTelemetry dependencies -->
+ <dependency>
+ <groupId>io.opentelemetry</groupId>
+ <artifactId>opentelemetry-api</artifactId>
+ <version>${opentelemetry.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.opentelemetry</groupId>
+ <artifactId>opentelemetry-sdk</artifactId>
+ <version>${opentelemetry.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.opentelemetry</groupId>
+ <artifactId>opentelemetry-sdk-testing</artifactId>
+ <version>${opentelemetry.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.opentelemetry.semconv</groupId>
+ <artifactId>opentelemetry-semconv</artifactId>
+ <version>${opentelemetry-semconv.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.opentelemetry</groupId>
+ <artifactId>opentelemetry-context</artifactId>
+ <version>${opentelemetry.version}</version>
+ </dependency>
</dependencies>
+
</dependencyManagement>
<build>
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 36c0b3937..d2146a521 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -128,6 +128,7 @@ public interface ClientProtoUtils {
Optional.ofNullable(request.getSlidingWindowEntry()).ifPresent(b::setSlidingWindowEntry);
Optional.ofNullable(request.getRoutingTable()).map(RoutingTable::toProto).ifPresent(b::setRoutingTable);
+ Optional.ofNullable(request.getSpanContext()).ifPresent(b::setSpanContext);
return b.setCallId(request.getCallId())
.setToLeader(request.isToLeader())
@@ -188,6 +189,9 @@ public interface ClientProtoUtils {
if (request.hasSlidingWindowEntry()) {
b.setSlidingWindowEntry(request.getSlidingWindowEntry());
}
+ if (request.hasSpanContext()) {
+ b.setSpanContext(request.getSpanContext());
+ }
return b.setClientId(ClientId.valueOf(request.getRequestorId()))
.setGroupId(ProtoUtils.toRaftGroupId(request.getRaftGroupId()))
.setCallId(request.getCallId())
diff --git a/ratis-common/pom.xml b/ratis-common/pom.xml
index f6bc0ee41..ba19c73e3 100644
--- a/ratis-common/pom.xml
+++ b/ratis-common/pom.xml
@@ -38,6 +38,27 @@
<artifactId>slf4j-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.opentelemetry</groupId>
+ <artifactId>opentelemetry-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.opentelemetry</groupId>
+ <artifactId>opentelemetry-context</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.opentelemetry</groupId>
+ <artifactId>opentelemetry-sdk</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.opentelemetry</groupId>
+ <artifactId>opentelemetry-sdk-testing</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.opentelemetry.semconv</groupId>
+ <artifactId>opentelemetry-semconv</artifactId>
+ </dependency>
+
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
index ed41f1ea2..b04402fe1 100644
---
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
+++
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
@@ -24,6 +24,7 @@ import
org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
import org.apache.ratis.proto.RaftProtos.ReadRequestTypeProto;
import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.proto.RaftProtos.SlidingWindowEntry;
+import org.apache.ratis.proto.RaftProtos.SpanContextProto;
import org.apache.ratis.proto.RaftProtos.StaleReadRequestTypeProto;
import org.apache.ratis.proto.RaftProtos.WatchRequestTypeProto;
import org.apache.ratis.proto.RaftProtos.WriteRequestTypeProto;
@@ -305,6 +306,7 @@ public class RaftClientRequest extends RaftClientMessage {
private SlidingWindowEntry slidingWindowEntry;
private RoutingTable routingTable;
private long timeoutMs;
+ private SpanContextProto spanContext;
public RaftClientRequest build() {
return new RaftClientRequest(this);
@@ -366,6 +368,11 @@ public class RaftClientRequest extends RaftClientMessage {
this.timeoutMs = timeoutMs;
return this;
}
+
+ public Builder setSpanContext(SpanContextProto spanContext) {
+ this.spanContext = spanContext;
+ return this;
+ }
}
public static Builder newBuilder() {
@@ -397,6 +404,8 @@ public class RaftClientRequest extends RaftClientMessage {
private final boolean toLeader;
+ private final SpanContextProto spanContext;
+
/** Construct a request for sending to the given server. */
protected RaftClientRequest(ClientId clientId, RaftPeerId serverId,
RaftGroupId groupId, long callId, Type type) {
this(newBuilder()
@@ -429,6 +438,7 @@ public class RaftClientRequest extends RaftClientMessage {
this.slidingWindowEntry = b.slidingWindowEntry;
this.routingTable = b.routingTable;
this.timeoutMs = b.timeoutMs;
+ this.spanContext = b.spanContext;
}
@Override
@@ -472,6 +482,10 @@ public class RaftClientRequest extends RaftClientMessage {
return timeoutMs;
}
+ public SpanContextProto getSpanContext() {
+ return spanContext;
+ }
+
@Override
public String toString() {
return super.toString() + ", seq=" +
ProtoUtils.toString(slidingWindowEntry) + ", "
diff --git
a/ratis-common/src/main/java/org/apache/ratis/trace/RatisAttributes.java
b/ratis-common/src/main/java/org/apache/ratis/trace/RatisAttributes.java
new file mode 100644
index 000000000..d74c63757
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/trace/RatisAttributes.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.trace;
+
+import io.opentelemetry.api.common.AttributeKey;
+
+/**
+ * The constants in this class correspond with the guidance outlined by the
OpenTelemetry <a href=
+ * "https://github.com/open-telemetry/semantic-conventions">Semantic
+ * Conventions</a>.
+ */
+public final class RatisAttributes {
+ public static final AttributeKey<String> CLIENT_ID =
AttributeKey.stringKey("raft.client.id");
+ public static final AttributeKey<String> MEMBER_ID =
AttributeKey.stringKey("raft.member.id");
+ public static final AttributeKey<String> CALL_ID =
AttributeKey.stringKey("raft.call.id");
+
+
+ private RatisAttributes() {
+ }
+}
diff --git
a/ratis-common/src/main/java/org/apache/ratis/trace/TraceConfigKeys.java
b/ratis-common/src/main/java/org/apache/ratis/trace/TraceConfigKeys.java
new file mode 100644
index 000000000..b0a1cbd9b
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/trace/TraceConfigKeys.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.trace;
+
+import org.apache.ratis.conf.RaftProperties;
+
+import java.util.function.Consumer;
+
+import static org.apache.ratis.conf.ConfUtils.getBoolean;
+import static org.apache.ratis.conf.ConfUtils.setBoolean;
+
+public interface TraceConfigKeys {
+ String PREFIX = "raft.otel.tracing";
+
+ String ENABLED_KEY = PREFIX + ".enabled";
+ boolean ENABLED_DEFAULT = false;
+
+ static boolean enabled(RaftProperties properties, Consumer<String> logger) {
+ return getBoolean(properties::getBoolean, ENABLED_KEY, ENABLED_DEFAULT,
logger);
+ }
+
+ static boolean enabled(RaftProperties properties) {
+ return enabled(properties, null);
+ }
+
+ static void setEnabled(RaftProperties properties, boolean enabled) {
+ setBoolean(properties::setBoolean, ENABLED_KEY, enabled);
+ }
+}
+
diff --git a/ratis-common/src/main/java/org/apache/ratis/trace/TraceUtils.java
b/ratis-common/src/main/java/org/apache/ratis/trace/TraceUtils.java
new file mode 100644
index 000000000..bb10e2455
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/trace/TraceUtils.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.trace;
+
+import io.opentelemetry.api.GlobalOpenTelemetry;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.api.trace.StatusCode;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
+import io.opentelemetry.context.propagation.TextMapPropagator;
+import io.opentelemetry.context.propagation.TextMapGetter;
+import org.apache.ratis.proto.RaftProtos.SpanContextProto;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.function.CheckedSupplier;
+import org.apache.ratis.util.VersionInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import java.util.function.Supplier;
+
+public final class TraceUtils {
+
+ private static final Tracer TRACER =
GlobalOpenTelemetry.getTracer("org.apache.ratis",
+ VersionInfo.getSoftwareInfoVersion());
+
+ private static final Logger LOG = LoggerFactory.getLogger(TraceUtils.class);
+
+ private TraceUtils() {
+ }
+
+ public static Tracer getGlobalTracer() {
+ return TRACER;
+ }
+
+ /**
+ * Trace an asynchronous operation represented by a {@link
CompletableFuture}.
+ * The returned future will complete with the same result or error as the
original future,
+ * but the provided {@code span} will be ended when the future completes.
+ */
+ static <T, THROWABLE extends Throwable> CompletableFuture<T>
traceAsyncMethod(
+ CheckedSupplier<CompletableFuture<T>, THROWABLE> action, Supplier<Span>
spanSupplier) throws THROWABLE {
+ final Span span = spanSupplier.get();
+ try (Scope ignored = span.makeCurrent()) {
+ final CompletableFuture<T> future;
+ try {
+ future = action.get();
+ } catch (RuntimeException | Error e) {
+ setError(span, e);
+ span.end();
+ throw e;
+ } catch (Throwable t) {
+ setError(span, t);
+ span.end();
+ throw JavaUtils.<THROWABLE>cast(t);
+ }
+ endSpan(future, span);
+ return future;
+ }
+ }
+
+ public static <T, THROWABLE extends Throwable> CompletableFuture<T>
traceAsyncMethod(
+ CheckedSupplier<CompletableFuture<T>, THROWABLE> action,
+ RaftClientRequest request, String memberId, String spanName) throws
THROWABLE {
+ return traceAsyncMethod(action, () ->
createServerSpanFromClientRequest(request, memberId, spanName));
+ }
+
+ public static <T, THROWABLE extends Throwable> CompletableFuture<T>
traceAsyncMethodIfEnabled(
+ boolean enabled,
+ CheckedSupplier<CompletableFuture<T>, THROWABLE> action,
+ RaftClientRequest request, String memberId, String spanName) throws
THROWABLE {
+ return enabled ? traceAsyncMethod(action, request, memberId, spanName) :
action.get();
+ }
+
+ private static Span createServerSpanFromClientRequest(RaftClientRequest
request, String memberId, String spanName) {
+ final Context remoteContext =
extractContextFromProto(request.getSpanContext());
+ final Span span = getGlobalTracer()
+ .spanBuilder(spanName)
+ .setParent(remoteContext)
+ .setSpanKind(SpanKind.SERVER)
+ .startSpan();
+ span.setAttribute(RatisAttributes.CLIENT_ID,
String.valueOf(request.getClientId()));
+ span.setAttribute(RatisAttributes.CALL_ID,
String.valueOf(request.getCallId()));
+ span.setAttribute(RatisAttributes.MEMBER_ID, memberId);
+ return span;
+ }
+
+ private static void endSpan(CompletableFuture<?> future, Span span) {
+ addListener(future, (resp, error) -> {
+ if (error != null) {
+ setError(span, error);
+ } else {
+ span.setStatus(StatusCode.OK);
+ }
+ span.end();
+ });
+ }
+
+ public static void setError(Span span, Throwable error) {
+ span.recordException(error);
+ span.setStatus(StatusCode.ERROR);
+ }
+
+ /**
+ * This is method is used when you just want to add a listener to the given
future. We will call
+ * {@link CompletableFuture#whenComplete(BiConsumer)} to register the {@code
action} to the
+ * {@code future}. Ignoring the return value of a Future is considered as a
bad practice as it may
+ * suppress exceptions thrown from the code that completes the future, and
this method will catch
+ * all the exception thrown from the {@code action} to catch possible code
bugs.
+ * <p/>
+ * And the error phone check will always report FutureReturnValueIgnored
because every method in
+ * the {@link CompletableFuture} class will return a new {@link
CompletableFuture}, so you always
+ * have one future that has not been checked. So we introduce this method
and add a suppression
+ * warnings annotation here.
+ */
+ @SuppressWarnings("FutureReturnValueIgnored")
+ private static <T> void addListener(CompletableFuture<T> future,
+ BiConsumer<? super T, ? super Throwable> action) {
+ future.whenComplete((resp, error) -> {
+ try {
+ // See this post on stack overflow(shorten since the url is too long),
+ // https://s.apache.org/completionexception
+ // For a chain of CompletableFuture, only the first child
CompletableFuture can get the
+ // original exception, others will get a CompletionException, which
wraps the original
+ // exception. So here we unwrap it before passing it to the callback
action.
+ action.accept(resp, JavaUtils.unwrapCompletionException(error));
+ } catch (Throwable t) {
+ LOG.error("Unexpected error caught when processing CompletableFuture",
t);
+ }
+ });
+ }
+
+ private static final TextMapPropagator PROPAGATOR =
+ GlobalOpenTelemetry.getPropagators().getTextMapPropagator();
+
+ public static SpanContextProto injectContextToProto(Context context) {
+ Map<String, String> carrier = new TreeMap<>();
+ PROPAGATOR.inject(context, carrier, (map, key, value) -> map.put(key,
value));
+ return SpanContextProto.newBuilder().putAllContext(carrier).build();
+ }
+
+ public static Context extractContextFromProto(SpanContextProto proto) {
+ if (proto == null || proto.getContextMap().isEmpty()) {
+ return Context.current();
+ }
+ final TextMapGetter<SpanContextProto> getter = SpanContextGetter.INSTANCE;
+ return PROPAGATOR.extract(Context.current(), proto, getter);
+ }
+}
+
+class SpanContextGetter implements TextMapGetter<SpanContextProto> {
+ static final SpanContextGetter INSTANCE = new SpanContextGetter();
+
+ @Override
+ public Iterable<String> keys(SpanContextProto carrier) {
+ return carrier.getContextMap().keySet();
+ }
+
+ @Override
+ public String get(SpanContextProto carrier, String key) {
+ return Optional.ofNullable(carrier).map(SpanContextProto::getContextMap)
+ .map(map -> map.get(key)).orElse(null);
+ }
+
+}
\ No newline at end of file
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/VersionInfo.java
b/ratis-common/src/main/java/org/apache/ratis/util/VersionInfo.java
index 07136e9a4..4f24879f8 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/VersionInfo.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/VersionInfo.java
@@ -158,6 +158,14 @@ public final class VersionInfo {
sortedMap.forEach(out);
}
+ /**
+ * Get the current ratis version.
+ * @return the current ratis version string.
+ */
+ public static String getSoftwareInfoVersion() {
+ return
VersionInfo.load(VersionInfo.class).softwareInfos.getOrDefault(SoftwareInfo.VERSION);
+ }
+
public static void main(String[] args) {
printSystemProperties((key, value) -> System.out.printf("%-40s = %s%n",
key, value));
diff --git a/ratis-proto/src/main/proto/Raft.proto
b/ratis-proto/src/main/proto/Raft.proto
index 6dbfdb15a..eba5de3b7 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -117,6 +117,7 @@ message RaftRpcRequestProto {
uint64 callId = 4;
bool toLeader = 5;
+ SpanContextProto spanContext = 11;
repeated uint64 repliedCallIds = 12; // The call ids of the replied requests
uint64 timeoutMs = 13;
RoutingTableProto routingTable = 14;
@@ -569,3 +570,8 @@ message LogInfoProto {
TermIndexProto committed = 3;
TermIndexProto lastEntry = 4;
}
+
+// The attribute map for opentelemetry trace
+message SpanContextProto {
+ map<string, string> context = 1;
+}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index d9dd09d96..0b891f658 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -100,6 +100,8 @@ import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.statemachine.impl.TransactionContextImpl;
import
org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
import
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.ratis.trace.TraceUtils;
+import org.apache.ratis.trace.TraceConfigKeys;
import org.apache.ratis.util.CodeInjectionForTesting;
import org.apache.ratis.util.CollectionUtils;
import org.apache.ratis.util.ConcurrentUtils;
@@ -256,6 +258,8 @@ class RaftServerImpl implements RaftServer.Division,
private final ExecutorService serverExecutor;
private final ExecutorService clientExecutor;
+ private final boolean tracingEnabled;
+
private final AtomicBoolean firstElectionSinceStartup = new
AtomicBoolean(true);
private final ThreadGroup threadGroup;
@@ -282,6 +286,7 @@ class RaftServerImpl implements RaftServer.Division,
this.readOption = RaftServerConfigKeys.Read.option(properties);
this.writeIndexCache = new WriteIndexCache(properties);
this.transactionManager = new TransactionManager(id);
+ this.tracingEnabled = TraceConfigKeys.enabled(properties);
this.leaderElectionMetrics =
LeaderElectionMetrics.getLeaderElectionMetrics(
getMemberId(), state::getLastLeaderElapsedTimeMs);
@@ -943,6 +948,14 @@ class RaftServerImpl implements RaftServer.Division,
@Override
public CompletableFuture<RaftClientReply> submitClientRequestAsync(
RaftClientRequest request) throws IOException {
+ return TraceUtils.traceAsyncMethodIfEnabled(
+ tracingEnabled,
+ () -> submitClientRequestAsyncInternal(request),
+ request, getMemberId().toString(),
"raft.server.submitClientRequestAsync");
+ }
+
+ private CompletableFuture<RaftClientReply> submitClientRequestAsyncInternal(
+ RaftClientRequest request) throws IOException {
assertLifeCycleState(LifeCycle.States.RUNNING);
LOG.debug("{}: receive client request({})", getMemberId(), request);
final Timekeeper timer =
raftServerMetrics.getClientRequestTimer(request.getType());
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerImplTracingTests.java
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerImplTracingTests.java
new file mode 100644
index 000000000..300cf51cd
--- /dev/null
+++
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerImplTracingTests.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.sdk.testing.junit5.OpenTelemetryExtension;
+import io.opentelemetry.sdk.trace.data.SpanData;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.exceptions.ServerNotReadyException;
+import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.statemachine.impl.SimpleStateMachine4Testing;
+import org.apache.ratis.trace.TraceConfigKeys;
+import org.apache.ratis.trace.TraceUtils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.mock;
+
+public class RaftServerImplTracingTests {
+
+ @RegisterExtension
+ private static final OpenTelemetryExtension openTelemetryExtension =
+ OpenTelemetryExtension.create();
+
+ @Test
+ public void testSubmitClientRequestAsync() throws Exception {
+ final List<SpanData> spans = submitClientRequestAndCollectNewSpans(true);
+ assertEquals(2, spans.size());
+ assertTrue(
+ spans.stream().anyMatch(s -> s.getKind() == SpanKind.CLIENT &&
s.getName().equals("client-span")),
+ "Expected at least one span with SpanKind.CLIENT"
+ );
+ assertTrue(
+ spans.stream().anyMatch(s -> s.getKind() == SpanKind.SERVER
+ && s.getName().equals("raft.server.submitClientRequestAsync")),
+ "Expected at least one span with SpanKind.SERVER"
+ );
+
+ }
+
+ @Test
+ public void testSubmitClientRequestAsyncTracingDisabled() throws Exception {
+ final List<SpanData> spans = submitClientRequestAndCollectNewSpans(false);
+ // Even when server-side tracing is disabled, we still emit the client
span used to
+ // generate the propagated context.
+ assertEquals(1, spans.size());
+ assertTrue(
+ spans.stream().noneMatch(s -> s.getKind() == SpanKind.SERVER
+ && s.getName().equals("raft.server.submitClientRequestAsync")),
+ "Expected no SERVER span when tracing is disabled"
+ );
+ assertTrue(
+ spans.stream().anyMatch(s -> s.getKind() == SpanKind.CLIENT &&
s.getName().equals("client-span")),
+ "Expected at least one span with SpanKind.CLIENT"
+ );
+ }
+
+ private static List<SpanData> submitClientRequestAndCollectNewSpans(boolean
enableTracing)
+ throws Exception {
+ final int before = openTelemetryExtension.getSpans().size();
+
+ final RaftServerImpl server = newRaftServerImpl(enableTracing);
+ try {
+ final RaftClientRequest request =
newRaftClientRequest(RaftClientRequest.writeRequestType());
+
+ try {
+ server.submitClientRequestAsync(request);
+ } catch (ServerNotReadyException ignored) {
+ // server is not running; only verifying span emission
+ }
+ } finally {
+ server.close();
+ }
+
+ final List<SpanData> after = openTelemetryExtension.getSpans();
+ return new ArrayList<>(after.subList(before, after.size()));
+ }
+
+ private static RaftServerImpl newRaftServerImpl(boolean enableTracing)
throws Exception {
+ final RaftGroup group = RaftGroup.emptyGroup();
+ final StateMachine sm = new SimpleStateMachine4Testing();
+ final RaftServerProxy proxy = mock(RaftServerProxy.class);
+ when(proxy.getId()).thenReturn(RaftPeerId.valueOf("peer1"));
+ final RaftProperties properties = new RaftProperties();
+ TraceConfigKeys.setEnabled(properties, enableTracing);
+ when(proxy.getProperties()).thenReturn(properties);
+ when(proxy.getThreadGroup()).thenReturn(new ThreadGroup("test"));
+ return new RaftServerImpl(group, sm, proxy,
RaftStorage.StartupOption.FORMAT);
+ }
+
+ private static RaftClientRequest newRaftClientRequest(RaftClientRequest.Type
type) {
+ final Span clientSpan =
+ openTelemetryExtension.getOpenTelemetry().getTracer("test")
+ .spanBuilder("client-span")
+ .setSpanKind(SpanKind.CLIENT)
+ .startSpan();
+ try {
+ final Context clientContext = Context.current().with(clientSpan);
+ return RaftClientRequest.newBuilder()
+ .setClientId(ClientId.randomId())
+ .setServerId(RaftPeerId.valueOf("s0"))
+ .setGroupId(RaftGroupId.randomId())
+ .setCallId(1L)
+ .setType(type)
+ .setSpanContext(TraceUtils.injectContextToProto(clientContext))
+ .build();
+ } finally {
+ clientSpan.end();
+ }
+ }
+}
+