This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new d1e1aa8df RATIS-2395. Add Client Span for AyncImpl (#1385)
d1e1aa8df is described below
commit d1e1aa8df1fb5da4f1b13be4f22a1268fdefd8c9
Author: Tak Lon (Stephen) Wu <[email protected]>
AuthorDate: Thu Mar 26 05:03:10 2026 +0800
RATIS-2395. Add Client Span for AyncImpl (#1385)
---
.../org/apache/ratis/client/impl/AsyncImpl.java | 4 +-
.../apache/ratis/client/impl/RaftClientImpl.java | 2 +
.../org/apache/ratis/trace/RatisAttributes.java | 4 +
.../java/org/apache/ratis/trace/TraceClient.java | 63 +++++++++++++
.../java/org/apache/ratis/trace/TraceServer.java | 59 ++++++++++++
.../java/org/apache/ratis/trace/TraceUtils.java | 103 +++++++++++----------
.../org/apache/ratis/trace/TestTraceUtils.java | 66 +++++++++++++
.../apache/ratis/server/impl/RaftServerImpl.java | 9 +-
.../ratis/grpc/TestRaftAsyncWithGrpcTracing.java | 75 +++++++++++++++
.../ratis/grpc/TestRetryCacheWithGrpcTracing.java | 81 ++++++++++++++++
10 files changed, 412 insertions(+), 54 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java
index 8547ce266..973b0db0c 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java
@@ -27,6 +27,7 @@ import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.trace.TraceClient;
/** Async api implementations. */
class AsyncImpl implements AsyncRpcApi {
@@ -38,7 +39,8 @@ class AsyncImpl implements AsyncRpcApi {
CompletableFuture<RaftClientReply> send(
RaftClientRequest.Type type, Message message, RaftPeerId server) {
- return client.getOrderedAsync().send(type, message, server);
+ return TraceClient.asyncSend(
+ () -> client.getOrderedAsync().send(type, message, server), type,
server);
}
@Override
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index db789aef2..27ae2e6ba 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -43,6 +43,7 @@ import
org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.thirdparty.com.google.common.cache.Cache;
import org.apache.ratis.thirdparty.com.google.common.cache.CacheBuilder;
+import org.apache.ratis.trace.TraceUtils;
import org.apache.ratis.util.CollectionUtils;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
@@ -218,6 +219,7 @@ public final class RaftClientImpl implements RaftClient {
.setParameters(parameters)
.build());
this.adminApi = JavaUtils.memoize(() -> new AdminImpl(this));
+ TraceUtils.setTracerWhenEnabled(properties);
}
@Override
diff --git
a/ratis-common/src/main/java/org/apache/ratis/trace/RatisAttributes.java
b/ratis-common/src/main/java/org/apache/ratis/trace/RatisAttributes.java
index d74c63757..3c3be83e7 100644
--- a/ratis-common/src/main/java/org/apache/ratis/trace/RatisAttributes.java
+++ b/ratis-common/src/main/java/org/apache/ratis/trace/RatisAttributes.java
@@ -29,6 +29,10 @@ public final class RatisAttributes {
public static final AttributeKey<String> MEMBER_ID =
AttributeKey.stringKey("raft.member.id");
public static final AttributeKey<String> CALL_ID =
AttributeKey.stringKey("raft.call.id");
+ public static final AttributeKey<String> PEER_ID =
AttributeKey.stringKey("raft.peer.id");
+ public static final AttributeKey<String> OPERATION_NAME =
AttributeKey.stringKey("raft.operation.name");
+ public static final AttributeKey<String> OPERATION_TYPE =
AttributeKey.stringKey("raft.operation.type");
+
private RatisAttributes() {
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/trace/TraceClient.java
b/ratis-common/src/main/java/org/apache/ratis/trace/TraceClient.java
new file mode 100644
index 000000000..0ab34e689
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/trace/TraceClient.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.trace;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.SpanKind;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.function.CheckedSupplier;
+
+import java.util.concurrent.CompletableFuture;
+
+/** Client-side OpenTelemetry helpers. */
+public final class TraceClient {
+ private static final String LEADER = "LEADER";
+
+ private TraceClient() {
+ }
+
+ /**
+ * Traces an asynchronous client send ({@code Async::send}) when tracing is
enabled.
+ */
+ public static <T, THROWABLE extends Throwable> CompletableFuture<T>
asyncSend(
+ CheckedSupplier<CompletableFuture<T>, THROWABLE> action,
+ RaftClientRequest.Type type, RaftPeerId server) throws THROWABLE {
+ if (!TraceUtils.isEnabled()) {
+ return action.get();
+ }
+ return TraceUtils.traceAsyncMethod(action,
+ () -> createClientOperationSpan(type, server, "Async::send"));
+ }
+
+ private static Span createClientOperationSpan(RaftClientRequest.Type type,
RaftPeerId server,
+ String spanName) {
+ Preconditions.assertNotNull(spanName, () -> "Span name cannot be null");
+ Preconditions.assertTrue(!spanName.isEmpty(), "Span name should not be
empty");
+ String peerId = server == null ? LEADER : String.valueOf(server);
+ final Span span = TraceUtils.getGlobalTracer()
+ .spanBuilder(spanName)
+ .setSpanKind(SpanKind.CLIENT)
+ .startSpan();
+ span.setAttribute(RatisAttributes.PEER_ID, peerId);
+ span.setAttribute(RatisAttributes.OPERATION_NAME, spanName);
+ span.setAttribute(RatisAttributes.OPERATION_TYPE, String.valueOf(type));
+ return span;
+ }
+}
diff --git a/ratis-common/src/main/java/org/apache/ratis/trace/TraceServer.java
b/ratis-common/src/main/java/org/apache/ratis/trace/TraceServer.java
new file mode 100644
index 000000000..9670f0d76
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/trace/TraceServer.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.trace;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.context.Context;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.util.function.CheckedSupplier;
+
+import java.util.concurrent.CompletableFuture;
+
+/** Server-side OpenTelemetry helpers. */
+public final class TraceServer {
+ private TraceServer() {
+ }
+
+ /**
+ * Traces {@code submitClientRequestAsync} when tracing is enabled.
+ */
+ public static <T, THROWABLE extends Throwable> CompletableFuture<T>
traceAsyncMethod(
+ CheckedSupplier<CompletableFuture<T>, THROWABLE> action,
+ RaftClientRequest request, String memberId, String spanName) throws
THROWABLE {
+ if (!TraceUtils.isEnabled()) {
+ return action.get();
+ }
+ return TraceUtils.traceAsyncMethod(action,
+ () -> createServerSpanFromClientRequest(request, memberId, spanName));
+ }
+
+ private static Span createServerSpanFromClientRequest(RaftClientRequest
request, String memberId,
+ String spanName) {
+ final Context remoteContext =
TraceUtils.extractContextFromProto(request.getSpanContext());
+ final Span span = TraceUtils.getGlobalTracer()
+ .spanBuilder(spanName)
+ .setParent(remoteContext)
+ .setSpanKind(SpanKind.SERVER)
+ .startSpan();
+ span.setAttribute(RatisAttributes.CLIENT_ID,
String.valueOf(request.getClientId()));
+ span.setAttribute(RatisAttributes.CALL_ID,
String.valueOf(request.getCallId()));
+ span.setAttribute(RatisAttributes.MEMBER_ID, memberId);
+ return span;
+ }
+}
diff --git a/ratis-common/src/main/java/org/apache/ratis/trace/TraceUtils.java
b/ratis-common/src/main/java/org/apache/ratis/trace/TraceUtils.java
index bb10e2455..f350ca888 100644
--- a/ratis-common/src/main/java/org/apache/ratis/trace/TraceUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/trace/TraceUtils.java
@@ -19,15 +19,14 @@ package org.apache.ratis.trace;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.trace.Span;
-import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.context.propagation.TextMapGetter;
+import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.SpanContextProto;
-import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.function.CheckedSupplier;
import org.apache.ratis.util.VersionInfo;
@@ -38,27 +37,58 @@ import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
+/** Common OpenTelemetry utilities shared by {@link TraceClient} and {@link
TraceServer}. */
public final class TraceUtils {
- private static final Tracer TRACER =
GlobalOpenTelemetry.getTracer("org.apache.ratis",
- VersionInfo.getSoftwareInfoVersion());
-
+ private static final AtomicReference<Tracer> TRACER = new
AtomicReference<>();
private static final Logger LOG = LoggerFactory.getLogger(TraceUtils.class);
private TraceUtils() {
}
public static Tracer getGlobalTracer() {
- return TRACER;
+ return TRACER.get();
+ }
+
+ /**
+ * Initializes the global tracer from configuration when tracing is enabled,
or clears it when
+ * disabled. Call from {@link org.apache.ratis.server.RaftServer} and
+ * {@link org.apache.ratis.client.RaftClient} construction so tracing follows
+ * {@link TraceConfigKeys}.
+ *
+ * @param properties raft configuration; tracing is on when {@link
TraceConfigKeys#enabled} is true
+ */
+ public static void setTracerWhenEnabled(RaftProperties properties) {
+ setTracerWhenEnabled(TraceConfigKeys.enabled(properties));
+ }
+
+ /**
+ * Enables or disables the tracer without reading {@link RaftProperties}.
Intended for tests and
+ * simple toggles; production code should prefer {@link
#setTracerWhenEnabled(RaftProperties)}.
+ *
+ * @param enabled when true, lazily obtains the OpenTelemetry tracer; when
false, clears it
+ */
+ public static void setTracerWhenEnabled(boolean enabled) {
+ if (enabled) {
+ TRACER.updateAndGet(previous -> previous != null ? previous
+ : GlobalOpenTelemetry.getTracer("org.apache.ratis",
VersionInfo.getSoftwareInfoVersion()));
+ } else {
+ TRACER.set(null);
+ }
+ }
+
+ static boolean isEnabled() {
+ return TRACER.get() != null;
}
/**
- * Trace an asynchronous operation represented by a {@link
CompletableFuture}.
- * The returned future will complete with the same result or error as the
original future,
- * but the provided {@code span} will be ended when the future completes.
+ * Traces an asynchronous operation represented by a {@link
CompletableFuture}. The returned future
+ * completes with the same outcome as the supplied future; the span is ended
when that future
+ * completes.
*/
static <T, THROWABLE extends Throwable> CompletableFuture<T>
traceAsyncMethod(
CheckedSupplier<CompletableFuture<T>, THROWABLE> action, Supplier<Span>
spanSupplier) throws THROWABLE {
@@ -81,40 +111,23 @@ public final class TraceUtils {
}
}
- public static <T, THROWABLE extends Throwable> CompletableFuture<T>
traceAsyncMethod(
- CheckedSupplier<CompletableFuture<T>, THROWABLE> action,
- RaftClientRequest request, String memberId, String spanName) throws
THROWABLE {
- return traceAsyncMethod(action, () ->
createServerSpanFromClientRequest(request, memberId, spanName));
- }
-
- public static <T, THROWABLE extends Throwable> CompletableFuture<T>
traceAsyncMethodIfEnabled(
- boolean enabled,
- CheckedSupplier<CompletableFuture<T>, THROWABLE> action,
- RaftClientRequest request, String memberId, String spanName) throws
THROWABLE {
- return enabled ? traceAsyncMethod(action, request, memberId, spanName) :
action.get();
- }
-
- private static Span createServerSpanFromClientRequest(RaftClientRequest
request, String memberId, String spanName) {
- final Context remoteContext =
extractContextFromProto(request.getSpanContext());
- final Span span = getGlobalTracer()
- .spanBuilder(spanName)
- .setParent(remoteContext)
- .setSpanKind(SpanKind.SERVER)
- .startSpan();
- span.setAttribute(RatisAttributes.CLIENT_ID,
String.valueOf(request.getClientId()));
- span.setAttribute(RatisAttributes.CALL_ID,
String.valueOf(request.getCallId()));
- span.setAttribute(RatisAttributes.MEMBER_ID, memberId);
- return span;
- }
-
private static void endSpan(CompletableFuture<?> future, Span span) {
+ if (span == null) {
+ LOG.debug("Span is null, cannot trace the future {}", future);
+ return;
+ }
addListener(future, (resp, error) -> {
- if (error != null) {
- setError(span, error);
- } else {
- span.setStatus(StatusCode.OK);
+ try {
+ if (error != null) {
+ setError(span, error);
+ } else {
+ span.setStatus(StatusCode.OK);
+ }
+ } catch (Throwable t) {
+ LOG.error("Error setting span status, ending span anyway", t);
+ } finally {
+ span.end();
}
- span.end();
});
}
@@ -140,12 +153,8 @@ public final class TraceUtils {
BiConsumer<? super T, ? super Throwable> action) {
future.whenComplete((resp, error) -> {
try {
- // See this post on stack overflow(shorten since the url is too long),
- // https://s.apache.org/completionexception
- // For a chain of CompletableFuture, only the first child
CompletableFuture can get the
- // original exception, others will get a CompletionException, which
wraps the original
- // exception. So here we unwrap it before passing it to the callback
action.
- action.accept(resp, JavaUtils.unwrapCompletionException(error));
+ // https://s.apache.org/completionexception — unwrap
CompletionException for callers
+ action.accept(resp, error == null ? null :
JavaUtils.unwrapCompletionException(error));
} catch (Throwable t) {
LOG.error("Unexpected error caught when processing CompletableFuture",
t);
}
@@ -184,4 +193,4 @@ class SpanContextGetter implements
TextMapGetter<SpanContextProto> {
.map(map -> map.get(key)).orElse(null);
}
-}
\ No newline at end of file
+}
diff --git
a/ratis-common/src/test/java/org/apache/ratis/trace/TestTraceUtils.java
b/ratis-common/src/test/java/org/apache/ratis/trace/TestTraceUtils.java
new file mode 100644
index 000000000..71f0f2564
--- /dev/null
+++ b/ratis-common/src/test/java/org/apache/ratis/trace/TestTraceUtils.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.trace;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.sdk.testing.junit5.OpenTelemetryExtension;
+import io.opentelemetry.sdk.trace.data.SpanData;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+public class TestTraceUtils {
+
+ @RegisterExtension
+ private static final OpenTelemetryExtension openTelemetryExtension =
+ OpenTelemetryExtension.create();
+
+ private void runTraceAsyncAndAssertClientSpan(boolean tracingEnabled,
boolean expectClientSpan)
+ throws Exception {
+ TraceUtils.setTracerWhenEnabled(tracingEnabled);
+ TraceClient.asyncSend(
+ () -> CompletableFuture.completedFuture("ok"),
+ null,
+ RaftPeerId.valueOf("s0")
+ ).get();
+
+ List<SpanData> spans = openTelemetryExtension.getSpans();
+ boolean hasClientSpan = spans.stream().anyMatch(s -> s.getKind() ==
SpanKind.CLIENT);
+ if (expectClientSpan) {
+ assertTrue(hasClientSpan, "Expected CLIENT span from traceAsyncRpcSend,
got: " + spans);
+ } else {
+ assertFalse(hasClientSpan, "Expected no CLIENT span when tracing
disabled, got: " + spans);
+ }
+ }
+
+ @Test
+ public void testTraceAsyncRpcSendCreatesClientSpan() throws Exception {
+ runTraceAsyncAndAssertClientSpan(true, true);
+ }
+
+ @Test
+ public void testTraceAsyncRpcSendCreatesClientSpanDisabled() throws
Exception {
+ runTraceAsyncAndAssertClientSpan(false, false);
+ }
+}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 0b891f658..958da846d 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -100,8 +100,8 @@ import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.statemachine.impl.TransactionContextImpl;
import
org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
import
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.ratis.trace.TraceServer;
import org.apache.ratis.trace.TraceUtils;
-import org.apache.ratis.trace.TraceConfigKeys;
import org.apache.ratis.util.CodeInjectionForTesting;
import org.apache.ratis.util.CollectionUtils;
import org.apache.ratis.util.ConcurrentUtils;
@@ -258,8 +258,6 @@ class RaftServerImpl implements RaftServer.Division,
private final ExecutorService serverExecutor;
private final ExecutorService clientExecutor;
- private final boolean tracingEnabled;
-
private final AtomicBoolean firstElectionSinceStartup = new
AtomicBoolean(true);
private final ThreadGroup threadGroup;
@@ -286,7 +284,7 @@ class RaftServerImpl implements RaftServer.Division,
this.readOption = RaftServerConfigKeys.Read.option(properties);
this.writeIndexCache = new WriteIndexCache(properties);
this.transactionManager = new TransactionManager(id);
- this.tracingEnabled = TraceConfigKeys.enabled(properties);
+ TraceUtils.setTracerWhenEnabled(properties);
this.leaderElectionMetrics =
LeaderElectionMetrics.getLeaderElectionMetrics(
getMemberId(), state::getLastLeaderElapsedTimeMs);
@@ -948,8 +946,7 @@ class RaftServerImpl implements RaftServer.Division,
@Override
public CompletableFuture<RaftClientReply> submitClientRequestAsync(
RaftClientRequest request) throws IOException {
- return TraceUtils.traceAsyncMethodIfEnabled(
- tracingEnabled,
+ return TraceServer.traceAsyncMethod(
() -> submitClientRequestAsyncInternal(request),
request, getMemberId().toString(),
"raft.server.submitClientRequestAsync");
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftAsyncWithGrpcTracing.java
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftAsyncWithGrpcTracing.java
new file mode 100644
index 000000000..d7c619cc9
--- /dev/null
+++
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftAsyncWithGrpcTracing.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.sdk.testing.junit5.OpenTelemetryExtension;
+import io.opentelemetry.sdk.trace.data.SpanData;
+import org.apache.ratis.RaftAsyncTests;
+import org.apache.ratis.trace.TraceConfigKeys;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.List;
+
+@Timeout(100)
+public class TestRaftAsyncWithGrpcTracing extends
RaftAsyncTests<MiniRaftClusterWithGrpc>
+ implements MiniRaftClusterWithGrpc.FactoryGet {
+ {
+ TraceConfigKeys.setEnabled(getProperties(), true);
+ }
+
+ @RegisterExtension
+ private static final OpenTelemetryExtension openTelemetryExtension =
+ OpenTelemetryExtension.create();
+
+ /**
+ * Verifies traceAsyncRpcSend creates CLIENT spans when tracing is enabled.
+ * testBasicAppendEntriesAsync uses client.async().send() which goes through
AsyncImpl.send().
+ */
+ @Test
+ public void testBasicAppendEntriesAsync() throws Exception {
+ super.testBasicAppendEntriesAsync();
+ List<SpanData> spans = openTelemetryExtension.getSpans();
+ assertTrue(
+ spans.stream().anyMatch(s -> s.getKind() == SpanKind.CLIENT),
+ "Expected at least one span with SpanKind.CLIENT (from
traceAsyncRpcSend)"
+ );
+ assertTrue(
+ spans.stream().anyMatch(s -> s.getKind() == SpanKind.SERVER),
+ "Expected at least one span with SpanKind.SERVER"
+ );
+ }
+
+ @Test
+ public void testWithLoadAsync() throws Exception {
+ super.testWithLoadAsync();
+ List<SpanData> spans = openTelemetryExtension.getSpans();
+ assertTrue(
+ spans.stream().anyMatch(s -> s.getKind() == SpanKind.CLIENT),
+ "Expected at least one span with SpanKind.CLIENT (from
traceAsyncRpcSend)"
+ );
+ assertTrue(
+ spans.stream().anyMatch(s -> s.getKind() == SpanKind.SERVER),
+ "Expected at least one span with SpanKind.SERVER"
+ );
+ }
+}
\ No newline at end of file
diff --git
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpcTracing.java
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpcTracing.java
new file mode 100644
index 000000000..722eae2cc
--- /dev/null
+++
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpcTracing.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import io.opentelemetry.api.GlobalOpenTelemetry;
+import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.sdk.testing.junit5.OpenTelemetryExtension;
+import io.opentelemetry.sdk.trace.data.SpanData;
+import org.apache.ratis.trace.TraceConfigKeys;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.List;
+
+public class TestRetryCacheWithGrpcTracing
+ extends TestRetryCacheWithGrpc {
+ @RegisterExtension
+ private static final OpenTelemetryExtension openTelemetryExtension =
+ OpenTelemetryExtension.create();
+ {
+ TraceConfigKeys.setEnabled(getProperties(), true);
+ }
+
+ private List<SpanData> spans;
+
+ @BeforeEach
+ void setUpOpenTelemetry() {
+ GlobalOpenTelemetry.resetForTest();
+ GlobalOpenTelemetry.set(openTelemetryExtension.getOpenTelemetry());
+ }
+
+ @AfterEach
+ void tearDownOpenTelemetry() {
+ GlobalOpenTelemetry.resetForTest();
+ }
+
+ /**
+ * Verifies traceAsyncRpcSend creates CLIENT spans when tracing is enabled.
+ * Uses testInvalidateRepliedCalls which exercises client.async().send()
(traceAsyncRpcSend path).
+ * testBasicRetry uses rpc.sendRequest() (blocking) which bypasses the async
tracing path.
+ */
+ @Test
+ public void testBasicRetry() throws Exception {
+ runWithNewCluster(3, cluster -> new
InvalidateRepliedCallsTest(cluster).run());
+
+ long deadline = System.currentTimeMillis() + 10000;
+ do {
+ spans = openTelemetryExtension.getSpans();
+ if (!spans.isEmpty()) break;
+ Thread.sleep(100);
+ } while (System.currentTimeMillis() < deadline);
+
+ assertTrue(
+ spans.stream().anyMatch(s -> s.getKind() == SpanKind.CLIENT),
+ "Expected at least one span with SpanKind.CLIENT (from
traceAsyncRpcSend)"
+ );
+ assertTrue(
+ spans.stream().anyMatch(s -> s.getKind() == SpanKind.SERVER),
+ "Expected at least one span with SpanKind.SERVER"
+ );
+ }
+}