This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch HBASE-22120 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 79eb3c990181873074bd25d14b4bcbc0d5311002 Author: Duo Zhang <zhang...@apache.org> AuthorDate: Wed Dec 23 11:51:53 2020 +0800 HBASE-25401 Add trace support for async call in rpc client (#2790) Signed-off-by: Guanghao Zhang <zg...@apache.org> --- .../apache/hadoop/hbase/ipc/AbstractRpcClient.java | 75 ++++--- .../hadoop/hbase/ipc/BlockingRpcConnection.java | 21 +- .../java/org/apache/hadoop/hbase/ipc/Call.java | 2 +- .../java/org/apache/hadoop/hbase/ipc/IPCUtil.java | 12 +- .../hadoop/hbase/ipc/NettyRpcDuplexHandler.java | 8 +- .../org/apache/hadoop/hbase/trace/TraceUtil.java | 4 +- .../src/main/protobuf/rpc/Tracing.proto | 14 +- hbase-server/pom.xml | 10 + .../org/apache/hadoop/hbase/ipc/CallRunner.java | 19 +- .../org/apache/hadoop/hbase/ipc/ServerCall.java | 14 ++ .../hadoop/hbase/ipc/ServerRpcConnection.java | 222 ++++++++++++--------- .../apache/hadoop/hbase/ipc/AbstractTestIPC.java | 59 +++++- .../org/apache/hadoop/hbase/ipc/TestNettyIPC.java | 1 + pom.xml | 12 +- 14 files changed, 299 insertions(+), 174 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java index e9ec6a9..3acc6c1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java @@ -21,6 +21,9 @@ package org.apache.hadoop.hbase.ipc; import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE; import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.context.Scope; import java.io.IOException; import java.net.SocketAddress; import java.util.Collection; @@ -38,6 +41,7 @@ import org.apache.hadoop.hbase.codec.KeyValueCodec; import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.PoolMap; import org.apache.hadoop.hbase.util.Threads; @@ -365,7 +369,7 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC protected abstract T createConnection(ConnectionId remoteId) throws IOException; private void onCallFinished(Call call, HBaseRpcController hrc, Address addr, - RpcCallback<Message> callback) { + RpcCallback<Message> callback) { call.callStats.setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.getStartTime()); if (metrics != null) { metrics.updateRpc(call.md, call.param, call.callStats); @@ -388,44 +392,59 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC } } - Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc, + private Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc, final Message param, Message returnType, final User ticket, final Address addr, final RpcCallback<Message> callback) { - final MetricsConnection.CallStats cs = MetricsConnection.newCallStats(); - cs.setStartTime(EnvironmentEdgeManager.currentTime()); - - if (param instanceof ClientProtos.MultiRequest) { - ClientProtos.MultiRequest req = (ClientProtos.MultiRequest) param; - int numActions = 0; - for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) { - numActions += regionAction.getActionCount(); - } + Span span = TraceUtil.getGlobalTracer().spanBuilder("RpcClient.callMethod." + md.getFullName()) + .startSpan(); + try (Scope scope = span.makeCurrent()) { + final MetricsConnection.CallStats cs = MetricsConnection.newCallStats(); + cs.setStartTime(EnvironmentEdgeManager.currentTime()); + + if (param instanceof ClientProtos.MultiRequest) { + ClientProtos.MultiRequest req = (ClientProtos.MultiRequest) param; + int numActions = 0; + for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) { + numActions += regionAction.getActionCount(); + } - cs.setNumActionsPerServer(numActions); - } + cs.setNumActionsPerServer(numActions); + } - final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr); - Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType, + final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr); + Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType, hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback<Call>() { @Override public void run(Call call) { - counter.decrementAndGet(); - onCallFinished(call, hrc, addr, callback); + try (Scope scope = call.span.makeCurrent()) { + counter.decrementAndGet(); + onCallFinished(call, hrc, addr, callback); + } finally { + if (hrc.failed()) { + span.setStatus(StatusCode.ERROR); + span.recordException(hrc.getFailed()); + } else { + span.setStatus(StatusCode.OK); + } + span.end(); + } } }, cs); - ConnectionId remoteId = new ConnectionId(ticket, md.getService().getName(), addr); - int count = counter.incrementAndGet(); - try { - if (count > maxConcurrentCallsPerServer) { - throw new ServerTooBusyException(addr, count); + ConnectionId remoteId = new ConnectionId(ticket, md.getService().getName(), addr); + int count = counter.incrementAndGet(); + try { + if (count > maxConcurrentCallsPerServer) { + throw new ServerTooBusyException(addr, count); + } + cs.setConcurrentCallsPerServer(count); + T connection = getConnection(remoteId); + connection.sendRequest(call, hrc); + } catch (Exception e) { + call.setException(toIOE(e)); + span.end(); } - cs.setConcurrentCallsPerServer(count); - T connection = getConnection(remoteId); - connection.sendRequest(call, hrc); - } catch (Exception e) { - call.setException(toIOE(e)); + return call; } - return call; } private static Address createAddr(ServerName sn) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java index 1a5cb73..eb8e1d9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java @@ -24,8 +24,6 @@ import static org.apache.hadoop.hbase.ipc.IPCUtil.isFatalConnectionException; import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled; import static org.apache.hadoop.hbase.ipc.IPCUtil.write; -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; @@ -57,7 +55,6 @@ import org.apache.hadoop.hbase.security.HBaseSaslRpcClient; import org.apache.hadoop.hbase.security.SaslUtil; import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection; import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider; -import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ExceptionUtil; import org.apache.hadoop.io.IOUtils; @@ -192,8 +189,8 @@ class BlockingRpcConnection extends RpcConnection implements Runnable { if (call.isDone()) { continue; } - try { - tracedWriteRequest(call); + try (Scope scope = call.span.makeCurrent()) { + writeRequest(call); } catch (IOException e) { // exception here means the call has not been added to the pendingCalls yet, so we need // to fail it by our own. @@ -594,16 +591,6 @@ class BlockingRpcConnection extends RpcConnection implements Runnable { this.out = new DataOutputStream(new BufferedOutputStream(saslRpcClient.getOutputStream())); } - private void tracedWriteRequest(Call call) throws IOException { - Span span = TraceUtil.getGlobalTracer().spanBuilder("RpcClientImpl.tracedWriteRequest") - .setParent(Context.current().with(call.span)).startSpan(); - try (Scope scope = span.makeCurrent()) { - writeRequest(call); - } finally { - span.end(); - } - } - /** * Initiates a call by sending the parameter to the remote server. Note: this is not called from * the Connection thread, but by other threads. @@ -811,7 +798,9 @@ class BlockingRpcConnection extends RpcConnection implements Runnable { if (callSender != null) { callSender.sendCall(call); } else { - tracedWriteRequest(call); + // this is in the same thread with the caller so do not need to attach the trace context + // again. + writeRequest(call); } } }); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java index cf33fff..5f12c73 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java @@ -61,7 +61,7 @@ class Call { final Span span; Timeout timeoutTask; - protected Call(int id, final Descriptors.MethodDescriptor md, Message param, + Call(int id, final Descriptors.MethodDescriptor md, Message param, final CellScanner cells, final Message responseDefaultType, int timeout, int priority, RpcCallback<Call> callback, MetricsConnection.CallStats callStats) { this.param = param; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java index c952f73..de76885 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.ipc; +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.context.Context; import java.io.IOException; import java.io.OutputStream; import java.lang.reflect.InvocationTargetException; @@ -48,6 +50,7 @@ import org.apache.hbase.thirdparty.io.netty.util.concurrent.FastThreadLocal; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; +import org.apache.hadoop.hbase.shaded.protobuf.generated.TracingProtos.RPCTInfo; /** * Utility to help ipc'ing. @@ -111,11 +114,10 @@ class IPCUtil { static RequestHeader buildRequestHeader(Call call, CellBlockMeta cellBlockMeta) { RequestHeader.Builder builder = RequestHeader.newBuilder(); builder.setCallId(call.id); - //TODO handle htrace API change, see HBASE-18895 - /*if (call.span != null) { - builder.setTraceInfo(RPCTInfo.newBuilder().setParentId(call.span.getSpanId()) - .setTraceId(call.span.getTracerId())); - }*/ + RPCTInfo.Builder traceBuilder = RPCTInfo.newBuilder(); + GlobalOpenTelemetry.getPropagators().getTextMapPropagator().inject(Context.current(), + traceBuilder, (carrier, key, value) -> carrier.putHeaders(key, value)); + builder.setTraceInfo(traceBuilder.build()); builder.setMethodName(call.md.getName()); builder.setRequestParam(call.param != null); if (cellBlockMeta != null) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java index 2a2df8a..2eb25a7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.ipc; +import io.opentelemetry.context.Scope; import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -114,9 +115,12 @@ class NettyRpcDuplexHandler extends ChannelDuplexHandler { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) - throws Exception { + throws Exception { if (msg instanceof Call) { - writeRequest(ctx, (Call) msg, promise); + Call call = (Call) msg; + try (Scope scope = call.span.makeCurrent()) { + writeRequest(ctx, call, promise); + } } else { ctx.write(msg, promise); } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java index f7a111f..768de9c 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java @@ -17,7 +17,7 @@ */ package org.apache.hadoop.hbase.trace; -import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.trace.Tracer; import org.apache.yetus.audience.InterfaceAudience; @@ -30,6 +30,6 @@ public final class TraceUtil { } public static Tracer getGlobalTracer() { - return OpenTelemetry.getGlobalTracer(INSTRUMENTATION_NAME); + return GlobalOpenTelemetry.getTracer(INSTRUMENTATION_NAME); } } diff --git a/hbase-protocol-shaded/src/main/protobuf/rpc/Tracing.proto b/hbase-protocol-shaded/src/main/protobuf/rpc/Tracing.proto index de605d1..276a0a7 100644 --- a/hbase-protocol-shaded/src/main/protobuf/rpc/Tracing.proto +++ b/hbase-protocol-shaded/src/main/protobuf/rpc/Tracing.proto @@ -23,12 +23,12 @@ option java_outer_classname = "TracingProtos"; option java_generate_equals_and_hash = true; option optimize_for = SPEED; -//Used to pass through the information necessary to continue -//a trace after an RPC is made. All we need is the traceid -//(so we know the overarching trace this message is a part of), and -//the id of the current span when this message was sent, so we know -//what span caused the new span we will create when this message is received. +// OpenTelemetry propagates trace context through https://www.w3.org/TR/trace-context/, which +// is a text-based approach that passes properties with http headers. Here we will also use this +// approach so we just need a map to store the key value pair. + message RPCTInfo { - optional int64 trace_id = 1; - optional int64 parent_id = 2; + optional int64 trace_id = 1 [deprecated = true]; + optional int64 parent_id = 2 [deprecated = true]; + map<string, string> headers = 3; } diff --git a/hbase-server/pom.xml b/hbase-server/pom.xml index c06862c..24e9486 100644 --- a/hbase-server/pom.xml +++ b/hbase-server/pom.xml @@ -423,6 +423,16 @@ <scope>test</scope> </dependency> <dependency> + <groupId>io.opentelemetry</groupId> + <artifactId>opentelemetry-sdk</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>io.opentelemetry</groupId> + <artifactId>opentelemetry-sdk-testing</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.hamcrest</groupId> <artifactId>hamcrest-library</artifactId> <scope>test</scope> diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java index ce593e2..fe9d139 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hbase.ipc; import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; import java.net.InetSocketAddress; import java.nio.channels.ClosedChannelException; @@ -72,15 +74,6 @@ public class CallRunner { return call; } - /** - * Keep for backward compatibility. - * @deprecated As of release 2.0, this will be removed in HBase 3.0 - */ - @Deprecated - public ServerCall<?> getCall() { - return (ServerCall<?>) call; - } - public void setStatus(MonitoredRPCHandler status) { this.status = status; } @@ -129,7 +122,8 @@ public class CallRunner { String serviceName = getServiceName(); String methodName = getMethodName(); String traceString = serviceName + "." + methodName; - Span span = TraceUtil.getGlobalTracer().spanBuilder(traceString).startSpan(); + Span span = TraceUtil.getGlobalTracer().spanBuilder(traceString) + .setParent(Context.current().with(((ServerCall<?>) call).getSpan())).startSpan(); try (Scope traceScope = span.makeCurrent()) { if (!this.rpcServer.isStarted()) { InetSocketAddress address = rpcServer.getListenerAddress(); @@ -140,8 +134,12 @@ public class CallRunner { resultPair = this.rpcServer.call(call, this.status); } catch (TimeoutIOException e){ RpcServer.LOG.warn("Can not complete this request in time, drop it: " + call); + span.recordException(e); + span.setStatus(StatusCode.ERROR); return; } catch (Throwable e) { + span.recordException(e); + span.setStatus(StatusCode.ERROR); if (e instanceof ServerNotRunningYetException) { // If ServerNotRunningYetException, don't spew stack trace. if (RpcServer.LOG.isTraceEnabled()) { @@ -160,6 +158,7 @@ public class CallRunner { RpcServer.CurCall.set(null); if (resultPair != null) { this.rpcServer.addCallSize(call.getSize() * -1); + span.setStatus(StatusCode.OK); sucessful = true; } span.end(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java index a5c8a39..268a6a1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.ipc; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; import java.io.IOException; import java.net.InetAddress; import java.nio.ByteBuffer; @@ -99,6 +101,8 @@ public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCa // the current implementation. We should fix this in the future. private final AtomicInteger reference = new AtomicInteger(0b01); + private final Span span; + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH", justification = "Can't figure why this complaint is happening... see below") ServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header, @@ -129,6 +133,7 @@ public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCa this.bbAllocator = byteBuffAllocator; this.cellBlockBuilder = cellBlockBuilder; this.reqCleanup = reqCleanup; + this.span = Span.current(); } /** @@ -147,6 +152,7 @@ public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCa // If the call was run successfuly, we might have already returned the BB // back to pool. No worries..Then inputCellBlock will be null cleanup(); + span.end(); } private void release(int mask) { @@ -226,6 +232,10 @@ public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCa } if (t != null) { this.isError = true; + span.recordException(t); + span.setStatus(StatusCode.ERROR); + } else { + span.setStatus(StatusCode.OK); } BufferChain bc = null; try { @@ -549,4 +559,8 @@ public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCa public synchronized BufferChain getResponse() { return response; } + + public Span getSpan() { + return span; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java index 0226de4..7654167 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java @@ -19,6 +19,11 @@ package org.apache.hadoop.hbase.ipc; import static org.apache.hadoop.hbase.HConstants.RPC_HEADER; +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.context.propagation.TextMapPropagator; import java.io.ByteArrayInputStream; import java.io.Closeable; import java.io.DataOutputStream; @@ -31,13 +36,11 @@ import java.nio.channels.ReadableByteChannel; import java.security.GeneralSecurityException; import java.util.Objects; import java.util.Properties; - import org.apache.commons.crypto.cipher.CryptoCipherFactory; import org.apache.commons.crypto.random.CryptoRandom; import org.apache.commons.crypto.random.CryptoRandomFactory; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.VersionInfoUtil; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.io.ByteBufferOutputStream; @@ -53,6 +56,20 @@ import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.provider.SaslServerAuthenticationProvider; import org.apache.hadoop.hbase.security.provider.SaslServerAuthenticationProviders; import org.apache.hadoop.hbase.security.provider.SimpleSaslServerAuthenticationProvider; +import org.apache.hadoop.hbase.trace.TraceUtil; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; +import org.apache.hadoop.security.authorize.AuthorizationException; +import org.apache.hadoop.security.authorize.ProxyUsers; +import org.apache.hadoop.security.token.SecretManager.InvalidToken; +import org.apache.yetus.audience.InterfaceAudience; + import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; import org.apache.hbase.thirdparty.com.google.protobuf.ByteInput; import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; @@ -61,6 +78,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescrip import org.apache.hbase.thirdparty.com.google.protobuf.Message; import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; + import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; @@ -68,17 +86,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHea import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableUtils; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; -import org.apache.hadoop.security.authorize.AuthorizationException; -import org.apache.hadoop.security.authorize.ProxyUsers; -import org.apache.hadoop.security.token.SecretManager.InvalidToken; +import org.apache.hadoop.hbase.shaded.protobuf.generated.TracingProtos.RPCTInfo; /** Reads calls from a connection and queues them for handling. */ @edu.umd.cs.findbugs.annotations.SuppressWarnings( @@ -607,99 +615,115 @@ abstract class ServerRpcConnection implements Closeable { ProtobufUtil.mergeFrom(builder, cis, headerSize); RequestHeader header = (RequestHeader) builder.build(); offset += headerSize; - int id = header.getCallId(); - if (RpcServer.LOG.isTraceEnabled()) { - RpcServer.LOG.trace("RequestHeader " + TextFormat.shortDebugString(header) - + " totalRequestSize: " + totalRequestSize + " bytes"); - } - // Enforcing the call queue size, this triggers a retry in the client - // This is a bit late to be doing this check - we have already read in the - // total request. - if ((totalRequestSize + - this.rpcServer.callQueueSizeInBytes.sum()) > this.rpcServer.maxQueueSizeInBytes) { - final ServerCall<?> callTooBig = createCall(id, this.service, null, null, null, null, - totalRequestSize, null, 0, this.callCleanup); - this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION); - callTooBig.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION, - "Call queue is full on " + this.rpcServer.server.getServerName() + - ", is hbase.ipc.server.max.callqueue.size too small?"); - callTooBig.sendResponseIfReady(); - return; - } - MethodDescriptor md = null; - Message param = null; - CellScanner cellScanner = null; - try { - if (header.hasRequestParam() && header.getRequestParam()) { - md = this.service.getDescriptorForType().findMethodByName( - header.getMethodName()); - if (md == null) - throw new UnsupportedOperationException(header.getMethodName()); - builder = this.service.getRequestPrototype(md).newBuilderForType(); - cis.resetSizeCounter(); - int paramSize = cis.readRawVarint32(); - offset += cis.getTotalBytesRead(); - if (builder != null) { - ProtobufUtil.mergeFrom(builder, cis, paramSize); - param = builder.build(); - } - offset += paramSize; - } else { - // currently header must have request param, so we directly throw - // exception here - String msg = "Invalid request header: " - + TextFormat.shortDebugString(header) - + ", should have param set in it"; - RpcServer.LOG.warn(msg); - throw new DoNotRetryIOException(msg); + TextMapPropagator.Getter<RPCTInfo> getter = new TextMapPropagator.Getter<RPCTInfo>() { + + @Override + public Iterable<String> keys(RPCTInfo carrier) { + return carrier.getHeadersMap().keySet(); } - if (header.hasCellBlockMeta()) { - buf.position(offset); - ByteBuff dup = buf.duplicate(); - dup.limit(offset + header.getCellBlockMeta().getLength()); - cellScanner = this.rpcServer.cellBlockBuilder.createCellScannerReusingBuffers( - this.codec, this.compressionCodec, dup); + + @Override + public String get(RPCTInfo carrier, String key) { + return carrier.getHeadersMap().get(key); } - } catch (Throwable t) { - InetSocketAddress address = this.rpcServer.getListenerAddress(); - String msg = (address != null ? address : "(channel closed)") - + " is unable to read call parameter from client " - + getHostAddress(); - RpcServer.LOG.warn(msg, t); - - this.rpcServer.metrics.exception(t); - - // probably the hbase hadoop version does not match the running hadoop - // version - if (t instanceof LinkageError) { - t = new DoNotRetryIOException(t); + }; + Context traceCtx = GlobalOpenTelemetry.getPropagators().getTextMapPropagator() + .extract(Context.current(), header.getTraceInfo(), getter); + Span span = + TraceUtil.getGlobalTracer().spanBuilder("RpcServer.process").setParent(traceCtx).startSpan(); + try (Scope scope = span.makeCurrent()) { + int id = header.getCallId(); + if (RpcServer.LOG.isTraceEnabled()) { + RpcServer.LOG.trace("RequestHeader " + TextFormat.shortDebugString(header) + + " totalRequestSize: " + totalRequestSize + " bytes"); } - // If the method is not present on the server, do not retry. - if (t instanceof UnsupportedOperationException) { - t = new DoNotRetryIOException(t); + // Enforcing the call queue size, this triggers a retry in the client + // This is a bit late to be doing this check - we have already read in the + // total request. + if ((totalRequestSize + + this.rpcServer.callQueueSizeInBytes.sum()) > this.rpcServer.maxQueueSizeInBytes) { + final ServerCall<?> callTooBig = createCall(id, this.service, null, null, null, null, + totalRequestSize, null, 0, this.callCleanup); + this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION); + callTooBig.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION, + "Call queue is full on " + this.rpcServer.server.getServerName() + + ", is hbase.ipc.server.max.callqueue.size too small?"); + callTooBig.sendResponseIfReady(); + return; } + MethodDescriptor md = null; + Message param = null; + CellScanner cellScanner = null; + try { + if (header.hasRequestParam() && header.getRequestParam()) { + md = this.service.getDescriptorForType().findMethodByName(header.getMethodName()); + if (md == null) { + throw new UnsupportedOperationException(header.getMethodName()); + } + builder = this.service.getRequestPrototype(md).newBuilderForType(); + cis.resetSizeCounter(); + int paramSize = cis.readRawVarint32(); + offset += cis.getTotalBytesRead(); + if (builder != null) { + ProtobufUtil.mergeFrom(builder, cis, paramSize); + param = builder.build(); + } + offset += paramSize; + } else { + // currently header must have request param, so we directly throw + // exception here + String msg = "Invalid request header: " + TextFormat.shortDebugString(header) + + ", should have param set in it"; + RpcServer.LOG.warn(msg); + throw new DoNotRetryIOException(msg); + } + if (header.hasCellBlockMeta()) { + buf.position(offset); + ByteBuff dup = buf.duplicate(); + dup.limit(offset + header.getCellBlockMeta().getLength()); + cellScanner = this.rpcServer.cellBlockBuilder.createCellScannerReusingBuffers(this.codec, + this.compressionCodec, dup); + } + } catch (Throwable t) { + InetSocketAddress address = this.rpcServer.getListenerAddress(); + String msg = (address != null ? address : "(channel closed)") + + " is unable to read call parameter from client " + getHostAddress(); + RpcServer.LOG.warn(msg, t); + + this.rpcServer.metrics.exception(t); + + // probably the hbase hadoop version does not match the running hadoop + // version + if (t instanceof LinkageError) { + t = new DoNotRetryIOException(t); + } + // If the method is not present on the server, do not retry. + if (t instanceof UnsupportedOperationException) { + t = new DoNotRetryIOException(t); + } - ServerCall<?> readParamsFailedCall = createCall(id, this.service, null, null, null, null, - totalRequestSize, null, 0, this.callCleanup); - readParamsFailedCall.setResponse(null, null, t, msg + "; " + t.getMessage()); - readParamsFailedCall.sendResponseIfReady(); - return; - } - - int timeout = 0; - if (header.hasTimeout() && header.getTimeout() > 0) { - timeout = Math.max(this.rpcServer.minClientRequestTimeout, header.getTimeout()); - } - ServerCall<?> call = createCall(id, this.service, md, header, param, cellScanner, totalRequestSize, - this.addr, timeout, this.callCleanup); + ServerCall<?> readParamsFailedCall = createCall(id, this.service, null, null, null, null, + totalRequestSize, null, 0, this.callCleanup); + readParamsFailedCall.setResponse(null, null, t, msg + "; " + t.getMessage()); + readParamsFailedCall.sendResponseIfReady(); + return; + } - if (!this.rpcServer.scheduler.dispatch(new CallRunner(this.rpcServer, call))) { - this.rpcServer.callQueueSizeInBytes.add(-1 * call.getSize()); - this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION); - call.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION, - "Call queue is full on " + this.rpcServer.server.getServerName() + + int timeout = 0; + if (header.hasTimeout() && header.getTimeout() > 0) { + timeout = Math.max(this.rpcServer.minClientRequestTimeout, header.getTimeout()); + } + ServerCall<?> call = createCall(id, this.service, md, header, param, cellScanner, + totalRequestSize, this.addr, timeout, this.callCleanup); + + if (!this.rpcServer.scheduler.dispatch(new CallRunner(this.rpcServer, call))) { + this.rpcServer.callQueueSizeInBytes.add(-1 * call.getSize()); + this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION); + call.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION, + "Call queue is full on " + this.rpcServer.server.getServerName() + ", too many items queued ?"); - call.sendResponseIfReady(); + call.sendResponseIfReady(); + } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java index 87561ba..11978ca 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java @@ -20,21 +20,28 @@ package org.apache.hadoop.hbase.ipc; import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE; import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub; import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newStub; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.ArgumentMatchers.anyObject; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.internal.verification.VerificationModeFactory.times; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; +import io.opentelemetry.sdk.trace.data.SpanData; import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; @@ -43,10 +50,12 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.util.StringUtils; +import org.junit.Rule; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -87,6 +96,10 @@ public abstract class AbstractTestIPC { protected abstract AbstractRpcClient<?> createRpcClientNoCodec(Configuration conf); + + @Rule + public OpenTelemetryRule traceRule = OpenTelemetryRule.create(); + /** * Ensure we do not HAVE TO HAVE a codec. */ @@ -183,7 +196,7 @@ public abstract class AbstractTestIPC { RpcServer rpcServer = createRpcServer(null, "testRpcServer", Lists.newArrayList(new RpcServer.BlockingServiceAndInterface( SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, scheduler); - verify(scheduler).init((RpcScheduler.Context) anyObject()); + verify(scheduler).init(any(RpcScheduler.Context.class)); try (AbstractRpcClient<?> client = createRpcClient(CONF)) { rpcServer.start(); verify(scheduler).start(); @@ -192,7 +205,7 @@ public abstract class AbstractTestIPC { for (int i = 0; i < 10; i++) { stub.echo(null, param); } - verify(scheduler, times(10)).dispatch((CallRunner) anyObject()); + verify(scheduler, times(10)).dispatch(any(CallRunner.class)); } finally { rpcServer.stop(); verify(scheduler).stop(); @@ -427,4 +440,44 @@ public abstract class AbstractTestIPC { } } + private void assertSameTraceId() { + String traceId = traceRule.getSpans().get(0).getTraceId(); + for (SpanData data : traceRule.getSpans()) { + // assert we are the same trace + assertEquals(traceId, data.getTraceId()); + } + } + + @Test + public void testTracing() throws IOException, ServiceException { + RpcServer rpcServer = createRpcServer(null, "testRpcServer", + Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), + new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); + try (AbstractRpcClient<?> client = createRpcClient(CONF)) { + rpcServer.start(); + BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); + stub.pause(null, PauseRequestProto.newBuilder().setMs(100).build()); + Waiter.waitFor(CONF, 1000, () -> traceRule.getSpans().stream().map(SpanData::getName) + .anyMatch(s -> s.equals("RpcClient.callMethod.TestProtobufRpcProto.pause"))); + + assertSameTraceId(); + for (SpanData data : traceRule.getSpans()) { + assertThat( + TimeUnit.NANOSECONDS.toMillis(data.getEndEpochNanos() - data.getStartEpochNanos()), + greaterThanOrEqualTo(100L)); + assertEquals(StatusCode.OK, data.getStatus().getStatusCode()); + } + + traceRule.clearSpans(); + assertThrows(ServiceException.class, + () -> stub.error(null, EmptyRequestProto.getDefaultInstance())); + Waiter.waitFor(CONF, 1000, () -> traceRule.getSpans().stream().map(SpanData::getName) + .anyMatch(s -> s.equals("RpcClient.callMethod.TestProtobufRpcProto.error"))); + + assertSameTraceId(); + for (SpanData data : traceRule.getSpans()) { + assertEquals(StatusCode.ERROR, data.getStatus().getStatusCode()); + } + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java index 2601fba..c3b52a9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java @@ -39,6 +39,7 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameter; import org.junit.runners.Parameterized.Parameters; + import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hbase.thirdparty.io.netty.channel.epoll.EpollEventLoopGroup; import org.apache.hbase.thirdparty.io.netty.channel.epoll.EpollSocketChannel; diff --git a/pom.xml b/pom.xml index 55323e9..785065a 100755 --- a/pom.xml +++ b/pom.xml @@ -1652,7 +1652,7 @@ <jruby.version>9.2.13.0</jruby.version> <junit.version>4.13</junit.version> <hamcrest.version>1.3</hamcrest.version> - <opentelemetry.version>0.12.0</opentelemetry.version> + <opentelemetry.version>0.13.1</opentelemetry.version> <log4j.version>1.2.17</log4j.version> <mockito-core.version>2.28.2</mockito-core.version> <protobuf.plugin.version>0.6.1</protobuf.plugin.version> @@ -2324,6 +2324,16 @@ <version>${opentelemetry.version}</version> </dependency> <dependency> + <groupId>io.opentelemetry</groupId> + <artifactId>opentelemetry-sdk</artifactId> + <version>${opentelemetry.version}</version> + </dependency> + <dependency> + <groupId>io.opentelemetry</groupId> + <artifactId>opentelemetry-sdk-testing</artifactId> + <version>${opentelemetry.version}</version> + </dependency> + <dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>${disruptor.version}</version>