This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-22120
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 79eb3c990181873074bd25d14b4bcbc0d5311002
Author: Duo Zhang <zhang...@apache.org>
AuthorDate: Wed Dec 23 11:51:53 2020 +0800

    HBASE-25401 Add trace support for async call in rpc client (#2790)
    
    Signed-off-by: Guanghao Zhang <zg...@apache.org>
---
 .../apache/hadoop/hbase/ipc/AbstractRpcClient.java |  75 ++++---
 .../hadoop/hbase/ipc/BlockingRpcConnection.java    |  21 +-
 .../java/org/apache/hadoop/hbase/ipc/Call.java     |   2 +-
 .../java/org/apache/hadoop/hbase/ipc/IPCUtil.java  |  12 +-
 .../hadoop/hbase/ipc/NettyRpcDuplexHandler.java    |   8 +-
 .../org/apache/hadoop/hbase/trace/TraceUtil.java   |   4 +-
 .../src/main/protobuf/rpc/Tracing.proto            |  14 +-
 hbase-server/pom.xml                               |  10 +
 .../org/apache/hadoop/hbase/ipc/CallRunner.java    |  19 +-
 .../org/apache/hadoop/hbase/ipc/ServerCall.java    |  14 ++
 .../hadoop/hbase/ipc/ServerRpcConnection.java      | 222 ++++++++++++---------
 .../apache/hadoop/hbase/ipc/AbstractTestIPC.java   |  59 +++++-
 .../org/apache/hadoop/hbase/ipc/TestNettyIPC.java  |   1 +
 pom.xml                                            |  12 +-
 14 files changed, 299 insertions(+), 174 deletions(-)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
index e9ec6a9..3acc6c1 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
@@ -21,6 +21,9 @@ package org.apache.hadoop.hbase.ipc;
 import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
 import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException;
 
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.StatusCode;
+import io.opentelemetry.context.Scope;
 import java.io.IOException;
 import java.net.SocketAddress;
 import java.util.Collection;
@@ -38,6 +41,7 @@ import org.apache.hadoop.hbase.codec.KeyValueCodec;
 import org.apache.hadoop.hbase.net.Address;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.trace.TraceUtil;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.PoolMap;
 import org.apache.hadoop.hbase.util.Threads;
@@ -365,7 +369,7 @@ public abstract class AbstractRpcClient<T extends 
RpcConnection> implements RpcC
   protected abstract T createConnection(ConnectionId remoteId) throws 
IOException;
 
   private void onCallFinished(Call call, HBaseRpcController hrc, Address addr,
-      RpcCallback<Message> callback) {
+    RpcCallback<Message> callback) {
     call.callStats.setCallTimeMs(EnvironmentEdgeManager.currentTime() - 
call.getStartTime());
     if (metrics != null) {
       metrics.updateRpc(call.md, call.param, call.callStats);
@@ -388,44 +392,59 @@ public abstract class AbstractRpcClient<T extends 
RpcConnection> implements RpcC
     }
   }
 
-  Call callMethod(final Descriptors.MethodDescriptor md, final 
HBaseRpcController hrc,
+  private Call callMethod(final Descriptors.MethodDescriptor md, final 
HBaseRpcController hrc,
       final Message param, Message returnType, final User ticket,
       final Address addr, final RpcCallback<Message> callback) {
-    final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
-    cs.setStartTime(EnvironmentEdgeManager.currentTime());
-
-    if (param instanceof ClientProtos.MultiRequest) {
-      ClientProtos.MultiRequest req = (ClientProtos.MultiRequest) param;
-      int numActions = 0;
-      for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) 
{
-        numActions += regionAction.getActionCount();
-      }
+    Span span = 
TraceUtil.getGlobalTracer().spanBuilder("RpcClient.callMethod." + 
md.getFullName())
+      .startSpan();
+    try (Scope scope = span.makeCurrent()) {
+      final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
+      cs.setStartTime(EnvironmentEdgeManager.currentTime());
+
+      if (param instanceof ClientProtos.MultiRequest) {
+        ClientProtos.MultiRequest req = (ClientProtos.MultiRequest) param;
+        int numActions = 0;
+        for (ClientProtos.RegionAction regionAction : 
req.getRegionActionList()) {
+          numActions += regionAction.getActionCount();
+        }
 
-      cs.setNumActionsPerServer(numActions);
-    }
+        cs.setNumActionsPerServer(numActions);
+      }
 
-    final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr);
-    Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), 
returnType,
+      final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr);
+      Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), 
returnType,
         hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback<Call>() {
           @Override
           public void run(Call call) {
-            counter.decrementAndGet();
-            onCallFinished(call, hrc, addr, callback);
+            try (Scope scope = call.span.makeCurrent()) {
+              counter.decrementAndGet();
+              onCallFinished(call, hrc, addr, callback);
+            } finally {
+              if (hrc.failed()) {
+                span.setStatus(StatusCode.ERROR);
+                span.recordException(hrc.getFailed());
+              } else {
+                span.setStatus(StatusCode.OK);
+              }
+              span.end();
+            }
           }
         }, cs);
-    ConnectionId remoteId = new ConnectionId(ticket, 
md.getService().getName(), addr);
-    int count = counter.incrementAndGet();
-    try {
-      if (count > maxConcurrentCallsPerServer) {
-        throw new ServerTooBusyException(addr, count);
+      ConnectionId remoteId = new ConnectionId(ticket, 
md.getService().getName(), addr);
+      int count = counter.incrementAndGet();
+      try {
+        if (count > maxConcurrentCallsPerServer) {
+          throw new ServerTooBusyException(addr, count);
+        }
+        cs.setConcurrentCallsPerServer(count);
+        T connection = getConnection(remoteId);
+        connection.sendRequest(call, hrc);
+      } catch (Exception e) {
+        call.setException(toIOE(e));
+        span.end();
       }
-      cs.setConcurrentCallsPerServer(count);
-      T connection = getConnection(remoteId);
-      connection.sendRequest(call, hrc);
-    } catch (Exception e) {
-      call.setException(toIOE(e));
+      return call;
     }
-    return call;
   }
 
   private static Address createAddr(ServerName sn) {
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
index 1a5cb73..eb8e1d9 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
@@ -24,8 +24,6 @@ import static 
org.apache.hadoop.hbase.ipc.IPCUtil.isFatalConnectionException;
 import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled;
 import static org.apache.hadoop.hbase.ipc.IPCUtil.write;
 
-import io.opentelemetry.api.trace.Span;
-import io.opentelemetry.context.Context;
 import io.opentelemetry.context.Scope;
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
@@ -57,7 +55,6 @@ import org.apache.hadoop.hbase.security.HBaseSaslRpcClient;
 import org.apache.hadoop.hbase.security.SaslUtil;
 import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
 import 
org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider;
-import org.apache.hadoop.hbase.trace.TraceUtil;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.ExceptionUtil;
 import org.apache.hadoop.io.IOUtils;
@@ -192,8 +189,8 @@ class BlockingRpcConnection extends RpcConnection 
implements Runnable {
           if (call.isDone()) {
             continue;
           }
-          try {
-            tracedWriteRequest(call);
+          try (Scope scope = call.span.makeCurrent()) {
+            writeRequest(call);
           } catch (IOException e) {
             // exception here means the call has not been added to the 
pendingCalls yet, so we need
             // to fail it by our own.
@@ -594,16 +591,6 @@ class BlockingRpcConnection extends RpcConnection 
implements Runnable {
     this.out = new DataOutputStream(new 
BufferedOutputStream(saslRpcClient.getOutputStream()));
   }
 
-  private void tracedWriteRequest(Call call) throws IOException {
-    Span span = 
TraceUtil.getGlobalTracer().spanBuilder("RpcClientImpl.tracedWriteRequest")
-      .setParent(Context.current().with(call.span)).startSpan();
-    try (Scope scope = span.makeCurrent()) {
-      writeRequest(call);
-    } finally {
-      span.end();
-    }
-  }
-
   /**
    * Initiates a call by sending the parameter to the remote server. Note: 
this is not called from
    * the Connection thread, but by other threads.
@@ -811,7 +798,9 @@ class BlockingRpcConnection extends RpcConnection 
implements Runnable {
         if (callSender != null) {
           callSender.sendCall(call);
         } else {
-          tracedWriteRequest(call);
+          // this is in the same thread with the caller so do not need to 
attach the trace context
+          // again.
+          writeRequest(call);
         }
       }
     });
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java
index cf33fff..5f12c73 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java
@@ -61,7 +61,7 @@ class Call {
   final Span span;
   Timeout timeoutTask;
 
-  protected Call(int id, final Descriptors.MethodDescriptor md, Message param,
+  Call(int id, final Descriptors.MethodDescriptor md, Message param,
       final CellScanner cells, final Message responseDefaultType, int timeout, 
int priority,
       RpcCallback<Call> callback, MetricsConnection.CallStats callStats) {
     this.param = param;
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
index c952f73..de76885 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
+import io.opentelemetry.api.GlobalOpenTelemetry;
+import io.opentelemetry.context.Context;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.lang.reflect.InvocationTargetException;
@@ -48,6 +50,7 @@ import 
org.apache.hbase.thirdparty.io.netty.util.concurrent.FastThreadLocal;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.TracingProtos.RPCTInfo;
 
 /**
  * Utility to help ipc'ing.
@@ -111,11 +114,10 @@ class IPCUtil {
   static RequestHeader buildRequestHeader(Call call, CellBlockMeta 
cellBlockMeta) {
     RequestHeader.Builder builder = RequestHeader.newBuilder();
     builder.setCallId(call.id);
-    //TODO handle htrace API change, see HBASE-18895
-    /*if (call.span != null) {
-      
builder.setTraceInfo(RPCTInfo.newBuilder().setParentId(call.span.getSpanId())
-          .setTraceId(call.span.getTracerId()));
-    }*/
+    RPCTInfo.Builder traceBuilder = RPCTInfo.newBuilder();
+    
GlobalOpenTelemetry.getPropagators().getTextMapPropagator().inject(Context.current(),
+      traceBuilder, (carrier, key, value) -> carrier.putHeaders(key, value));
+    builder.setTraceInfo(traceBuilder.build());
     builder.setMethodName(call.md.getName());
     builder.setRequestParam(call.param != null);
     if (cellBlockMeta != null) {
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java
index 2a2df8a..2eb25a7 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
+import io.opentelemetry.context.Scope;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
@@ -114,9 +115,12 @@ class NettyRpcDuplexHandler extends ChannelDuplexHandler {
 
   @Override
   public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise 
promise)
-      throws Exception {
+    throws Exception {
     if (msg instanceof Call) {
-      writeRequest(ctx, (Call) msg, promise);
+      Call call = (Call) msg;
+      try (Scope scope = call.span.makeCurrent()) {
+        writeRequest(ctx, call, promise);
+      }
     } else {
       ctx.write(msg, promise);
     }
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java
index f7a111f..768de9c 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java
@@ -17,7 +17,7 @@
  */
 package org.apache.hadoop.hbase.trace;
 
-import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.GlobalOpenTelemetry;
 import io.opentelemetry.api.trace.Tracer;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -30,6 +30,6 @@ public final class TraceUtil {
   }
 
   public static Tracer getGlobalTracer() {
-    return OpenTelemetry.getGlobalTracer(INSTRUMENTATION_NAME);
+    return GlobalOpenTelemetry.getTracer(INSTRUMENTATION_NAME);
   }
 }
diff --git a/hbase-protocol-shaded/src/main/protobuf/rpc/Tracing.proto 
b/hbase-protocol-shaded/src/main/protobuf/rpc/Tracing.proto
index de605d1..276a0a7 100644
--- a/hbase-protocol-shaded/src/main/protobuf/rpc/Tracing.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/rpc/Tracing.proto
@@ -23,12 +23,12 @@ option java_outer_classname = "TracingProtos";
 option java_generate_equals_and_hash = true;
 option optimize_for = SPEED;
 
-//Used to pass through the information necessary to continue
-//a trace after an RPC is made. All we need is the traceid
-//(so we know the overarching trace this message is a part of), and
-//the id of the current span when this message was sent, so we know
-//what span caused the new span we will create when this message is received.
+// OpenTelemetry propagates trace context through 
https://www.w3.org/TR/trace-context/, which
+// is a text-based approach that passes properties with http headers. Here we 
will also use this
+// approach so we just need a map to store the key value pair.
+
 message RPCTInfo {
-  optional int64 trace_id = 1;
-  optional int64 parent_id = 2;
+  optional int64 trace_id = 1 [deprecated = true];
+  optional int64 parent_id = 2 [deprecated = true];
+  map<string, string> headers = 3;
 }
diff --git a/hbase-server/pom.xml b/hbase-server/pom.xml
index c06862c..24e9486 100644
--- a/hbase-server/pom.xml
+++ b/hbase-server/pom.xml
@@ -423,6 +423,16 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>io.opentelemetry</groupId>
+      <artifactId>opentelemetry-sdk</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>io.opentelemetry</groupId>
+      <artifactId>opentelemetry-sdk-testing</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.hamcrest</groupId>
       <artifactId>hamcrest-library</artifactId>
       <scope>test</scope>
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
index ce593e2..fe9d139 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
@@ -18,6 +18,8 @@
 package org.apache.hadoop.hbase.ipc;
 
 import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.StatusCode;
+import io.opentelemetry.context.Context;
 import io.opentelemetry.context.Scope;
 import java.net.InetSocketAddress;
 import java.nio.channels.ClosedChannelException;
@@ -72,15 +74,6 @@ public class CallRunner {
     return call;
   }
 
-  /**
-   * Keep for backward compatibility.
-   * @deprecated As of release 2.0, this will be removed in HBase 3.0
-   */
-  @Deprecated
-  public ServerCall<?> getCall() {
-    return (ServerCall<?>) call;
-  }
-
   public void setStatus(MonitoredRPCHandler status) {
     this.status = status;
   }
@@ -129,7 +122,8 @@ public class CallRunner {
       String serviceName = getServiceName();
       String methodName = getMethodName();
       String traceString = serviceName + "." + methodName;
-      Span span = 
TraceUtil.getGlobalTracer().spanBuilder(traceString).startSpan();
+      Span span = TraceUtil.getGlobalTracer().spanBuilder(traceString)
+        .setParent(Context.current().with(((ServerCall<?>) 
call).getSpan())).startSpan();
       try (Scope traceScope = span.makeCurrent()) {
         if (!this.rpcServer.isStarted()) {
           InetSocketAddress address = rpcServer.getListenerAddress();
@@ -140,8 +134,12 @@ public class CallRunner {
         resultPair = this.rpcServer.call(call, this.status);
       } catch (TimeoutIOException e){
         RpcServer.LOG.warn("Can not complete this request in time, drop it: " 
+ call);
+        span.recordException(e);
+        span.setStatus(StatusCode.ERROR);
         return;
       } catch (Throwable e) {
+        span.recordException(e);
+        span.setStatus(StatusCode.ERROR);
         if (e instanceof ServerNotRunningYetException) {
           // If ServerNotRunningYetException, don't spew stack trace.
           if (RpcServer.LOG.isTraceEnabled()) {
@@ -160,6 +158,7 @@ public class CallRunner {
         RpcServer.CurCall.set(null);
         if (resultPair != null) {
           this.rpcServer.addCallSize(call.getSize() * -1);
+          span.setStatus(StatusCode.OK);
           sucessful = true;
         }
         span.end();
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
index a5c8a39..268a6a1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.StatusCode;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
@@ -99,6 +101,8 @@ public abstract class ServerCall<T extends 
ServerRpcConnection> implements RpcCa
   // the current implementation. We should fix this in the future.
   private final AtomicInteger reference = new AtomicInteger(0b01);
 
+  private final Span span;
+
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = 
"NP_NULL_ON_SOME_PATH",
       justification = "Can't figure why this complaint is happening... see 
below")
   ServerCall(int id, BlockingService service, MethodDescriptor md, 
RequestHeader header,
@@ -129,6 +133,7 @@ public abstract class ServerCall<T extends 
ServerRpcConnection> implements RpcCa
     this.bbAllocator = byteBuffAllocator;
     this.cellBlockBuilder = cellBlockBuilder;
     this.reqCleanup = reqCleanup;
+    this.span = Span.current();
   }
 
   /**
@@ -147,6 +152,7 @@ public abstract class ServerCall<T extends 
ServerRpcConnection> implements RpcCa
     // If the call was run successfuly, we might have already returned the BB
     // back to pool. No worries..Then inputCellBlock will be null
     cleanup();
+    span.end();
   }
 
   private void release(int mask) {
@@ -226,6 +232,10 @@ public abstract class ServerCall<T extends 
ServerRpcConnection> implements RpcCa
     }
     if (t != null) {
       this.isError = true;
+      span.recordException(t);
+      span.setStatus(StatusCode.ERROR);
+    } else {
+      span.setStatus(StatusCode.OK);
     }
     BufferChain bc = null;
     try {
@@ -549,4 +559,8 @@ public abstract class ServerCall<T extends 
ServerRpcConnection> implements RpcCa
   public synchronized BufferChain getResponse() {
     return response;
   }
+
+  public Span getSpan() {
+    return span;
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java
index 0226de4..7654167 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java
@@ -19,6 +19,11 @@ package org.apache.hadoop.hbase.ipc;
 
 import static org.apache.hadoop.hbase.HConstants.RPC_HEADER;
 
+import io.opentelemetry.api.GlobalOpenTelemetry;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
+import io.opentelemetry.context.propagation.TextMapPropagator;
 import java.io.ByteArrayInputStream;
 import java.io.Closeable;
 import java.io.DataOutputStream;
@@ -31,13 +36,11 @@ import java.nio.channels.ReadableByteChannel;
 import java.security.GeneralSecurityException;
 import java.util.Objects;
 import java.util.Properties;
-
 import org.apache.commons.crypto.cipher.CryptoCipherFactory;
 import org.apache.commons.crypto.random.CryptoRandom;
 import org.apache.commons.crypto.random.CryptoRandomFactory;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.client.VersionInfoUtil;
 import org.apache.hadoop.hbase.codec.Codec;
 import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
@@ -53,6 +56,20 @@ import org.apache.hadoop.hbase.security.User;
 import 
org.apache.hadoop.hbase.security.provider.SaslServerAuthenticationProvider;
 import 
org.apache.hadoop.hbase.security.provider.SaslServerAuthenticationProviders;
 import 
org.apache.hadoop.hbase.security.provider.SimpleSaslServerAuthenticationProvider;
+import org.apache.hadoop.hbase.trace.TraceUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.apache.hadoop.security.authorize.ProxyUsers;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.yetus.audience.InterfaceAudience;
+
 import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
 import org.apache.hbase.thirdparty.com.google.protobuf.ByteInput;
 import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
@@ -61,6 +78,7 @@ import 
org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescrip
 import org.apache.hbase.thirdparty.com.google.protobuf.Message;
 import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
 import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
+
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
@@ -68,17 +86,7 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHea
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
-import org.apache.hadoop.security.authorize.AuthorizationException;
-import org.apache.hadoop.security.authorize.ProxyUsers;
-import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.TracingProtos.RPCTInfo;
 
 /** Reads calls from a connection and queues them for handling. */
 @edu.umd.cs.findbugs.annotations.SuppressWarnings(
@@ -607,99 +615,115 @@ abstract class ServerRpcConnection implements Closeable {
     ProtobufUtil.mergeFrom(builder, cis, headerSize);
     RequestHeader header = (RequestHeader) builder.build();
     offset += headerSize;
-    int id = header.getCallId();
-    if (RpcServer.LOG.isTraceEnabled()) {
-      RpcServer.LOG.trace("RequestHeader " + 
TextFormat.shortDebugString(header)
-          + " totalRequestSize: " + totalRequestSize + " bytes");
-    }
-    // Enforcing the call queue size, this triggers a retry in the client
-    // This is a bit late to be doing this check - we have already read in the
-    // total request.
-    if ((totalRequestSize +
-        this.rpcServer.callQueueSizeInBytes.sum()) > 
this.rpcServer.maxQueueSizeInBytes) {
-      final ServerCall<?> callTooBig = createCall(id, this.service, null, 
null, null, null,
-        totalRequestSize, null, 0, this.callCleanup);
-      this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION);
-      callTooBig.setResponse(null, null,  
RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION,
-        "Call queue is full on " + this.rpcServer.server.getServerName() +
-        ", is hbase.ipc.server.max.callqueue.size too small?");
-      callTooBig.sendResponseIfReady();
-      return;
-    }
-    MethodDescriptor md = null;
-    Message param = null;
-    CellScanner cellScanner = null;
-    try {
-      if (header.hasRequestParam() && header.getRequestParam()) {
-        md = this.service.getDescriptorForType().findMethodByName(
-            header.getMethodName());
-        if (md == null)
-          throw new UnsupportedOperationException(header.getMethodName());
-        builder = this.service.getRequestPrototype(md).newBuilderForType();
-        cis.resetSizeCounter();
-        int paramSize = cis.readRawVarint32();
-        offset += cis.getTotalBytesRead();
-        if (builder != null) {
-          ProtobufUtil.mergeFrom(builder, cis, paramSize);
-          param = builder.build();
-        }
-        offset += paramSize;
-      } else {
-        // currently header must have request param, so we directly throw
-        // exception here
-        String msg = "Invalid request header: "
-            + TextFormat.shortDebugString(header)
-            + ", should have param set in it";
-        RpcServer.LOG.warn(msg);
-        throw new DoNotRetryIOException(msg);
+    TextMapPropagator.Getter<RPCTInfo> getter = new 
TextMapPropagator.Getter<RPCTInfo>() {
+
+      @Override
+      public Iterable<String> keys(RPCTInfo carrier) {
+        return carrier.getHeadersMap().keySet();
       }
-      if (header.hasCellBlockMeta()) {
-        buf.position(offset);
-        ByteBuff dup = buf.duplicate();
-        dup.limit(offset + header.getCellBlockMeta().getLength());
-        cellScanner = 
this.rpcServer.cellBlockBuilder.createCellScannerReusingBuffers(
-            this.codec, this.compressionCodec, dup);
+
+      @Override
+      public String get(RPCTInfo carrier, String key) {
+        return carrier.getHeadersMap().get(key);
       }
-    } catch (Throwable t) {
-      InetSocketAddress address = this.rpcServer.getListenerAddress();
-      String msg = (address != null ? address : "(channel closed)")
-          + " is unable to read call parameter from client "
-          + getHostAddress();
-      RpcServer.LOG.warn(msg, t);
-
-      this.rpcServer.metrics.exception(t);
-
-      // probably the hbase hadoop version does not match the running hadoop
-      // version
-      if (t instanceof LinkageError) {
-        t = new DoNotRetryIOException(t);
+    };
+    Context traceCtx = 
GlobalOpenTelemetry.getPropagators().getTextMapPropagator()
+      .extract(Context.current(), header.getTraceInfo(), getter);
+    Span span =
+      
TraceUtil.getGlobalTracer().spanBuilder("RpcServer.process").setParent(traceCtx).startSpan();
+    try (Scope scope = span.makeCurrent()) {
+      int id = header.getCallId();
+      if (RpcServer.LOG.isTraceEnabled()) {
+        RpcServer.LOG.trace("RequestHeader " + 
TextFormat.shortDebugString(header) +
+          " totalRequestSize: " + totalRequestSize + " bytes");
       }
-      // If the method is not present on the server, do not retry.
-      if (t instanceof UnsupportedOperationException) {
-        t = new DoNotRetryIOException(t);
+      // Enforcing the call queue size, this triggers a retry in the client
+      // This is a bit late to be doing this check - we have already read in 
the
+      // total request.
+      if ((totalRequestSize +
+        this.rpcServer.callQueueSizeInBytes.sum()) > 
this.rpcServer.maxQueueSizeInBytes) {
+        final ServerCall<?> callTooBig = createCall(id, this.service, null, 
null, null, null,
+          totalRequestSize, null, 0, this.callCleanup);
+        
this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION);
+        callTooBig.setResponse(null, null, 
RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION,
+          "Call queue is full on " + this.rpcServer.server.getServerName() +
+            ", is hbase.ipc.server.max.callqueue.size too small?");
+        callTooBig.sendResponseIfReady();
+        return;
       }
+      MethodDescriptor md = null;
+      Message param = null;
+      CellScanner cellScanner = null;
+      try {
+        if (header.hasRequestParam() && header.getRequestParam()) {
+          md = 
this.service.getDescriptorForType().findMethodByName(header.getMethodName());
+          if (md == null) {
+            throw new UnsupportedOperationException(header.getMethodName());
+          }
+          builder = this.service.getRequestPrototype(md).newBuilderForType();
+          cis.resetSizeCounter();
+          int paramSize = cis.readRawVarint32();
+          offset += cis.getTotalBytesRead();
+          if (builder != null) {
+            ProtobufUtil.mergeFrom(builder, cis, paramSize);
+            param = builder.build();
+          }
+          offset += paramSize;
+        } else {
+          // currently header must have request param, so we directly throw
+          // exception here
+          String msg = "Invalid request header: " + 
TextFormat.shortDebugString(header) +
+            ", should have param set in it";
+          RpcServer.LOG.warn(msg);
+          throw new DoNotRetryIOException(msg);
+        }
+        if (header.hasCellBlockMeta()) {
+          buf.position(offset);
+          ByteBuff dup = buf.duplicate();
+          dup.limit(offset + header.getCellBlockMeta().getLength());
+          cellScanner = 
this.rpcServer.cellBlockBuilder.createCellScannerReusingBuffers(this.codec,
+            this.compressionCodec, dup);
+        }
+      } catch (Throwable t) {
+        InetSocketAddress address = this.rpcServer.getListenerAddress();
+        String msg = (address != null ? address : "(channel closed)") +
+          " is unable to read call parameter from client " + getHostAddress();
+        RpcServer.LOG.warn(msg, t);
+
+        this.rpcServer.metrics.exception(t);
+
+        // probably the hbase hadoop version does not match the running hadoop
+        // version
+        if (t instanceof LinkageError) {
+          t = new DoNotRetryIOException(t);
+        }
+        // If the method is not present on the server, do not retry.
+        if (t instanceof UnsupportedOperationException) {
+          t = new DoNotRetryIOException(t);
+        }
 
-      ServerCall<?> readParamsFailedCall = createCall(id, this.service, null, 
null, null, null,
-        totalRequestSize, null, 0, this.callCleanup);
-      readParamsFailedCall.setResponse(null, null, t, msg + "; " + 
t.getMessage());
-      readParamsFailedCall.sendResponseIfReady();
-      return;
-    }
-
-    int timeout = 0;
-    if (header.hasTimeout() && header.getTimeout() > 0) {
-      timeout = Math.max(this.rpcServer.minClientRequestTimeout, 
header.getTimeout());
-    }
-    ServerCall<?> call = createCall(id, this.service, md, header, param, 
cellScanner, totalRequestSize,
-      this.addr, timeout, this.callCleanup);
+        ServerCall<?> readParamsFailedCall = createCall(id, this.service, 
null, null, null, null,
+          totalRequestSize, null, 0, this.callCleanup);
+        readParamsFailedCall.setResponse(null, null, t, msg + "; " + 
t.getMessage());
+        readParamsFailedCall.sendResponseIfReady();
+        return;
+      }
 
-    if (!this.rpcServer.scheduler.dispatch(new CallRunner(this.rpcServer, 
call))) {
-      this.rpcServer.callQueueSizeInBytes.add(-1 * call.getSize());
-      this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION);
-      call.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION,
-        "Call queue is full on " + this.rpcServer.server.getServerName() +
+      int timeout = 0;
+      if (header.hasTimeout() && header.getTimeout() > 0) {
+        timeout = Math.max(this.rpcServer.minClientRequestTimeout, 
header.getTimeout());
+      }
+      ServerCall<?> call = createCall(id, this.service, md, header, param, 
cellScanner,
+        totalRequestSize, this.addr, timeout, this.callCleanup);
+
+      if (!this.rpcServer.scheduler.dispatch(new CallRunner(this.rpcServer, 
call))) {
+        this.rpcServer.callQueueSizeInBytes.add(-1 * call.getSize());
+        
this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION);
+        call.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION,
+          "Call queue is full on " + this.rpcServer.server.getServerName() +
             ", too many items queued ?");
-      call.sendResponseIfReady();
+        call.sendResponseIfReady();
+      }
     }
   }
 
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
index 87561ba..11978ca 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
@@ -20,21 +20,28 @@ package org.apache.hadoop.hbase.ipc;
 import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
 import static 
org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
 import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newStub;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.ArgumentMatchers.anyObject;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.internal.verification.VerificationModeFactory.times;
 
+import io.opentelemetry.api.trace.StatusCode;
+import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
+import io.opentelemetry.sdk.trace.data.SpanData;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
@@ -43,10 +50,12 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.util.StringUtils;
+import org.junit.Rule;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -87,6 +96,10 @@ public abstract class AbstractTestIPC {
 
   protected abstract AbstractRpcClient<?> createRpcClientNoCodec(Configuration 
conf);
 
+
+  @Rule
+  public OpenTelemetryRule traceRule = OpenTelemetryRule.create();
+
   /**
    * Ensure we do not HAVE TO HAVE a codec.
    */
@@ -183,7 +196,7 @@ public abstract class AbstractTestIPC {
     RpcServer rpcServer = createRpcServer(null, "testRpcServer",
         Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
             SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, 
scheduler);
-    verify(scheduler).init((RpcScheduler.Context) anyObject());
+    verify(scheduler).init(any(RpcScheduler.Context.class));
     try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
       rpcServer.start();
       verify(scheduler).start();
@@ -192,7 +205,7 @@ public abstract class AbstractTestIPC {
       for (int i = 0; i < 10; i++) {
         stub.echo(null, param);
       }
-      verify(scheduler, times(10)).dispatch((CallRunner) anyObject());
+      verify(scheduler, times(10)).dispatch(any(CallRunner.class));
     } finally {
       rpcServer.stop();
       verify(scheduler).stop();
@@ -427,4 +440,44 @@ public abstract class AbstractTestIPC {
     }
   }
 
+  private void assertSameTraceId() {
+    String traceId = traceRule.getSpans().get(0).getTraceId();
+    for (SpanData data : traceRule.getSpans()) {
+      // assert we are the same trace
+      assertEquals(traceId, data.getTraceId());
+    }
+  }
+
+  @Test
+  public void testTracing() throws IOException, ServiceException {
+    RpcServer rpcServer = createRpcServer(null, "testRpcServer",
+      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, 
null)),
+      new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 
1));
+    try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
+      rpcServer.start();
+      BlockingInterface stub = newBlockingStub(client, 
rpcServer.getListenerAddress());
+      stub.pause(null, PauseRequestProto.newBuilder().setMs(100).build());
+      Waiter.waitFor(CONF, 1000, () -> 
traceRule.getSpans().stream().map(SpanData::getName)
+        .anyMatch(s -> 
s.equals("RpcClient.callMethod.TestProtobufRpcProto.pause")));
+
+      assertSameTraceId();
+      for (SpanData data : traceRule.getSpans()) {
+        assertThat(
+          TimeUnit.NANOSECONDS.toMillis(data.getEndEpochNanos() - 
data.getStartEpochNanos()),
+          greaterThanOrEqualTo(100L));
+        assertEquals(StatusCode.OK, data.getStatus().getStatusCode());
+      }
+
+      traceRule.clearSpans();
+      assertThrows(ServiceException.class,
+        () -> stub.error(null, EmptyRequestProto.getDefaultInstance()));
+      Waiter.waitFor(CONF, 1000, () -> 
traceRule.getSpans().stream().map(SpanData::getName)
+        .anyMatch(s -> 
s.equals("RpcClient.callMethod.TestProtobufRpcProto.error")));
+
+      assertSameTraceId();
+      for (SpanData data : traceRule.getSpans()) {
+        assertEquals(StatusCode.ERROR, data.getStatus().getStatusCode());
+      }
+    }
+  }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java
index 2601fba..c3b52a9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java
@@ -39,6 +39,7 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameter;
 import org.junit.runners.Parameterized.Parameters;
+
 import org.apache.hbase.thirdparty.io.netty.channel.Channel;
 import org.apache.hbase.thirdparty.io.netty.channel.epoll.EpollEventLoopGroup;
 import org.apache.hbase.thirdparty.io.netty.channel.epoll.EpollSocketChannel;
diff --git a/pom.xml b/pom.xml
index 55323e9..785065a 100755
--- a/pom.xml
+++ b/pom.xml
@@ -1652,7 +1652,7 @@
     <jruby.version>9.2.13.0</jruby.version>
     <junit.version>4.13</junit.version>
     <hamcrest.version>1.3</hamcrest.version>
-    <opentelemetry.version>0.12.0</opentelemetry.version>
+    <opentelemetry.version>0.13.1</opentelemetry.version>
     <log4j.version>1.2.17</log4j.version>
     <mockito-core.version>2.28.2</mockito-core.version>
     <protobuf.plugin.version>0.6.1</protobuf.plugin.version>
@@ -2324,6 +2324,16 @@
         <version>${opentelemetry.version}</version>
       </dependency>
       <dependency>
+        <groupId>io.opentelemetry</groupId>
+        <artifactId>opentelemetry-sdk</artifactId>
+        <version>${opentelemetry.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>io.opentelemetry</groupId>
+        <artifactId>opentelemetry-sdk-testing</artifactId>
+        <version>${opentelemetry.version}</version>
+      </dependency>
+      <dependency>
         <groupId>com.lmax</groupId>
         <artifactId>disruptor</artifactId>
         <version>${disruptor.version}</version>

Reply via email to