This is an automated email from the ASF dual-hosted git repository. taklwu pushed a commit to branch HBASE-25853 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit c96d8fdbd574c354d3722e19465b120cfc575388 Author: Tak Lon (Stephen) Wu <tak...@apache.org> AuthorDate: Wed Aug 4 18:25:24 2021 -0700 HBASE-26127 Backport HBASE-23898 "Add trace support for simple apis i… (#3556) 4/17 commits of HBASE-22120, original commit 805b2ae2ad0f6325515d46043ff01e4e2c7a9f59 Co-authored-by: Duo Zhang <zhang...@apache.org> Signed-off-by: Duo Zhang <zhang...@apache.org> --- hbase-client/pom.xml | 10 + .../hadoop/hbase/client/AsyncConnection.java | 4 +- .../hadoop/hbase/client/AsyncConnectionImpl.java | 106 +++--- .../hadoop/hbase/client/AsyncRegionLocator.java | 166 +++++--- .../org/apache/hadoop/hbase/client/AsyncTable.java | 31 +- .../hbase/client/AsyncTableRegionLocatorImpl.java | 18 +- .../hadoop/hbase/client/ConnectionFactory.java | 53 +-- .../hadoop/hbase/client/RawAsyncTableImpl.java | 345 +++++++++-------- .../apache/hadoop/hbase/ipc/AbstractRpcClient.java | 9 +- .../client/TestAsyncRegionLocatorTracing.java | 157 ++++++++ .../hadoop/hbase/client/TestAsyncTableTracing.java | 417 +++++++++++++++++++++ .../org/apache/hadoop/hbase/trace/TraceUtil.java | 134 +++++++ .../org/apache/hadoop/hbase/ipc/CallRunner.java | 7 +- .../hadoop/hbase/ipc/ServerRpcConnection.java | 3 +- .../apache/hadoop/hbase/ipc/AbstractTestIPC.java | 24 +- pom.xml | 3 +- 16 files changed, 1159 insertions(+), 328 deletions(-) diff --git a/hbase-client/pom.xml b/hbase-client/pom.xml index e62a7d1..01a74cc 100644 --- a/hbase-client/pom.xml +++ b/hbase-client/pom.xml @@ -157,6 +157,16 @@ <artifactId>joni</artifactId> </dependency> <dependency> + <groupId>io.opentelemetry</groupId> + <artifactId>opentelemetry-sdk</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>io.opentelemetry</groupId> + <artifactId>opentelemetry-sdk-testing</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.slf4j</groupId> <artifactId>jcl-over-slf4j</artifactId> <scope>test</scope> diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java index 75971ad..d04b5f2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java @@ -64,8 +64,8 @@ public interface AsyncConnection extends Closeable { /** * Retrieve an {@link AsyncTable} implementation for accessing a table. * <p> - * The returned instance will use default configs. Use {@link #getTableBuilder(TableName)} if - * you want to customize some configs. + * The returned instance will use default configs. Use {@link #getTableBuilder(TableName)} if you + * want to customize some configs. * <p> * This method no longer checks table existence. An exception will be thrown if the table does not * exist only when the first operation is attempted. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index 2ed7399..b919ee1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -27,6 +27,8 @@ import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRI import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY; import static org.apache.hadoop.hbase.util.FutureUtils.addListener; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.context.Scope; import java.io.IOException; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -48,6 +50,7 @@ import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcClientFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.ConcurrentMapUtils; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.security.UserGroupInformation; @@ -153,14 +156,13 @@ class AsyncConnectionImpl implements AsyncConnection { LOG.warn("{} is true, but {} is not set", STATUS_PUBLISHED, STATUS_LISTENER_CLASS); } else { try { - listener = new ClusterStatusListener( - new ClusterStatusListener.DeadServerHandler() { - @Override - public void newDead(ServerName sn) { - locator.clearCache(sn); - rpcClient.cancelConnections(sn); - } - }, conf, listenerClass); + listener = new ClusterStatusListener(new ClusterStatusListener.DeadServerHandler() { + @Override + public void newDead(ServerName sn) { + locator.clearCache(sn); + rpcClient.cancelConnections(sn); + } + }, conf, listenerClass); } catch (IOException e) { LOG.warn("Failed create of ClusterStatusListener, not a critical, ignoring...", e); } @@ -195,27 +197,29 @@ class AsyncConnectionImpl implements AsyncConnection { @Override public void close() { - // As the code below is safe to be executed in parallel, here we do not use CAS or lock, just a - // simple volatile flag. - if (closed) { - return; - } - LOG.info("Connection has been closed by {}.", Thread.currentThread().getName()); - if(LOG.isDebugEnabled()){ - logCallStack(Thread.currentThread().getStackTrace()); - } - IOUtils.closeQuietly(clusterStatusListener, - e -> LOG.warn("failed to close clusterStatusListener", e)); - IOUtils.closeQuietly(rpcClient, e -> LOG.warn("failed to close rpcClient", e)); - IOUtils.closeQuietly(registry, e -> LOG.warn("failed to close registry", e)); - synchronized (this) { - if (choreService != null) { - choreService.shutdown(); - choreService = null; + TraceUtil.trace(() -> { + // As the code below is safe to be executed in parallel, here we do not use CAS or lock, + // just a simple volatile flag. + if (closed) { + return; } - } - metrics.ifPresent(MetricsConnection::shutdown); - closed = true; + LOG.info("Connection has been closed by {}.", Thread.currentThread().getName()); + if (LOG.isDebugEnabled()) { + logCallStack(Thread.currentThread().getStackTrace()); + } + IOUtils.closeQuietly(clusterStatusListener, + e -> LOG.warn("failed to close clusterStatusListener", e)); + IOUtils.closeQuietly(rpcClient, e -> LOG.warn("failed to close rpcClient", e)); + IOUtils.closeQuietly(registry, e -> LOG.warn("failed to close registry", e)); + synchronized (this) { + if (choreService != null) { + choreService.shutdown(); + choreService = null; + } + } + metrics.ifPresent(MetricsConnection::shutdown); + closed = true; + }, "AsyncConnection.close"); } private void logCallStack(StackTraceElement[] stackTraceElements) { @@ -320,7 +324,7 @@ class AsyncConnectionImpl implements AsyncConnection { @Override public AsyncTableBuilder<ScanResultConsumer> getTableBuilder(TableName tableName, - ExecutorService pool) { + ExecutorService pool) { return new AsyncTableBuilderBase<ScanResultConsumer>(tableName, connConf) { @Override @@ -361,35 +365,43 @@ class AsyncConnectionImpl implements AsyncConnection { @Override public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName, - ExecutorService pool) { + ExecutorService pool) { return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool), RETRY_TIMER); } @Override public CompletableFuture<Hbck> getHbck() { - CompletableFuture<Hbck> future = new CompletableFuture<>(); - addListener(registry.getActiveMaster(), (sn, error) -> { - if (error != null) { - future.completeExceptionally(error); - } else { - try { - future.complete(getHbck(sn)); - } catch (IOException e) { - future.completeExceptionally(e); + return TraceUtil.tracedFuture(() -> { + CompletableFuture<Hbck> future = new CompletableFuture<>(); + addListener(registry.getActiveMaster(), (sn, error) -> { + if (error != null) { + future.completeExceptionally(error); + } else { + try { + future.complete(getHbck(sn)); + } catch (IOException e) { + future.completeExceptionally(e); + } } - } - }); - return future; + }); + return future; + }, getClass().getName() + ".getHbck"); } @Override public Hbck getHbck(ServerName masterServer) throws IOException { - // we will not create a new connection when creating a new protobuf stub, and for hbck there - // will be no performance consideration, so for simplification we will create a new stub every - // time instead of caching the stub here. - return new HBaseHbck(MasterProtos.HbckService.newBlockingStub( - rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)), rpcControllerFactory); + Span span = TraceUtil.createSpan(getClass().getName() + ".getHbck") + .setAttribute(TraceUtil.SERVER_NAME_KEY, masterServer.getServerName()); + try (Scope scope = span.makeCurrent()) { + // we will not create a new connection when creating a new protobuf stub, and for hbck there + // will be no performance consideration, so for simplification we will create a new stub every + // time instead of caching the stub here. + return new HBaseHbck( + MasterProtos.HbckService + .newBlockingStub(rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)), + rpcControllerFactory); + } } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java index d50070a..1d0efcc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java @@ -18,16 +18,28 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; +import static org.apache.hadoop.hbase.trace.TraceUtil.REGION_NAMES_KEY; +import static org.apache.hadoop.hbase.trace.TraceUtil.SERVER_NAME_KEY; +import static org.apache.hadoop.hbase.trace.TraceUtil.createSpan; +import static org.apache.hadoop.hbase.trace.TraceUtil.createTableSpan; import static org.apache.hadoop.hbase.util.FutureUtils.addListener; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.context.Scope; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import java.util.function.Supplier; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.exceptions.TimeoutIOException; +import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; @@ -60,7 +72,7 @@ class AsyncRegionLocator { } private <T> CompletableFuture<T> withTimeout(CompletableFuture<T> future, long timeoutNs, - Supplier<String> timeoutMsg) { + Supplier<String> timeoutMsg) { if (future.isDone() || timeoutNs <= 0) { return future; } @@ -83,64 +95,101 @@ class AsyncRegionLocator { return TableName.isMetaTableName(tableName); } + private <T> CompletableFuture<T> tracedLocationFuture(Supplier<CompletableFuture<T>> action, + Function<T, List<String>> getRegionNames, TableName tableName, String methodName) { + Span span = createTableSpan(getClass().getSimpleName() + "." + methodName, tableName); + try (Scope scope = span.makeCurrent()) { + CompletableFuture<T> future = action.get(); + FutureUtils.addListener(future, (resp, error) -> { + if (error != null) { + span.recordException(error); + span.setStatus(StatusCode.ERROR); + } else { + List<String> regionNames = getRegionNames.apply(resp); + if (!regionNames.isEmpty()) { + span.setAttribute(REGION_NAMES_KEY, regionNames); + } + span.setStatus(StatusCode.OK); + } + span.end(); + }); + return future; + } + } + + private List<String> getRegionName(RegionLocations locs) { + List<String> names = new ArrayList<>(); + for (HRegionLocation loc : locs.getRegionLocations()) { + if (loc != null) { + names.add(loc.getRegion().getRegionNameAsString()); + } + } + return names; + } + CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row, - RegionLocateType type, boolean reload, long timeoutNs) { - CompletableFuture<RegionLocations> future = isMeta(tableName) - ? metaRegionLocator.getRegionLocations(RegionReplicaUtil.DEFAULT_REPLICA_ID, reload) - : nonMetaRegionLocator.getRegionLocations(tableName, row, - RegionReplicaUtil.DEFAULT_REPLICA_ID, type, reload); - return withTimeout(future, timeoutNs, - () -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) + - "ms) waiting for region locations for " + tableName + ", row='" + - Bytes.toStringBinary(row) + "'"); + RegionLocateType type, boolean reload, long timeoutNs) { + return tracedLocationFuture(() -> { + CompletableFuture<RegionLocations> future = isMeta(tableName) ? + metaRegionLocator.getRegionLocations(RegionReplicaUtil.DEFAULT_REPLICA_ID, reload) : + nonMetaRegionLocator.getRegionLocations(tableName, row, + RegionReplicaUtil.DEFAULT_REPLICA_ID, type, reload); + return withTimeout(future, timeoutNs, + () -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) + + "ms) waiting for region locations for " + tableName + ", row='" + + Bytes.toStringBinary(row) + "'"); + }, this::getRegionName, tableName, "getRegionLocations"); } CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row, - int replicaId, RegionLocateType type, boolean reload, long timeoutNs) { - // meta region can not be split right now so we always call the same method. - // Change it later if the meta table can have more than one regions. - CompletableFuture<HRegionLocation> future = new CompletableFuture<>(); - CompletableFuture<RegionLocations> locsFuture = - isMeta(tableName) ? metaRegionLocator.getRegionLocations(replicaId, reload) - : nonMetaRegionLocator.getRegionLocations(tableName, row, replicaId, type, reload); - addListener(locsFuture, (locs, error) -> { - if (error != null) { - future.completeExceptionally(error); - return; - } - HRegionLocation loc = locs.getRegionLocation(replicaId); - if (loc == null) { - future.completeExceptionally( - new RegionOfflineException("No location for " + tableName + ", row='" + - Bytes.toStringBinary(row) + "', locateType=" + type + ", replicaId=" + replicaId)); - } else if (loc.getServerName() == null) { - future.completeExceptionally( - new RegionOfflineException("No server address listed for region '" + - loc.getRegion().getRegionNameAsString() + ", row='" + Bytes.toStringBinary(row) + - "', locateType=" + type + ", replicaId=" + replicaId)); - } else { - future.complete(loc); - } - }); - return withTimeout(future, timeoutNs, - () -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) + - "ms) waiting for region location for " + tableName + ", row='" + Bytes.toStringBinary(row) + - "', replicaId=" + replicaId); + int replicaId, RegionLocateType type, boolean reload, long timeoutNs) { + return tracedLocationFuture(() -> { + // meta region can not be split right now so we always call the same method. + // Change it later if the meta table can have more than one regions. + CompletableFuture<HRegionLocation> future = new CompletableFuture<>(); + CompletableFuture<RegionLocations> locsFuture = + isMeta(tableName) ? metaRegionLocator.getRegionLocations(replicaId, reload) : + nonMetaRegionLocator.getRegionLocations(tableName, row, replicaId, type, reload); + addListener(locsFuture, (locs, error) -> { + if (error != null) { + future.completeExceptionally(error); + return; + } + HRegionLocation loc = locs.getRegionLocation(replicaId); + if (loc == null) { + future.completeExceptionally( + new RegionOfflineException("No location for " + tableName + ", row='" + + Bytes.toStringBinary(row) + "', locateType=" + type + ", replicaId=" + replicaId)); + } else if (loc.getServerName() == null) { + future.completeExceptionally( + new RegionOfflineException("No server address listed for region '" + + loc.getRegion().getRegionNameAsString() + ", row='" + Bytes.toStringBinary(row) + + "', locateType=" + type + ", replicaId=" + replicaId)); + } else { + future.complete(loc); + } + }); + return withTimeout(future, timeoutNs, + () -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) + + "ms) waiting for region location for " + tableName + ", row='" + + Bytes.toStringBinary(row) + "', replicaId=" + replicaId); + }, loc -> Arrays.asList(loc.getRegion().getRegionNameAsString()), tableName, + "getRegionLocation"); } CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row, - int replicaId, RegionLocateType type, long timeoutNs) { + int replicaId, RegionLocateType type, long timeoutNs) { return getRegionLocation(tableName, row, replicaId, type, false, timeoutNs); } CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row, - RegionLocateType type, boolean reload, long timeoutNs) { + RegionLocateType type, boolean reload, long timeoutNs) { return getRegionLocation(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID, type, reload, timeoutNs); } CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row, - RegionLocateType type, long timeoutNs) { + RegionLocateType type, long timeoutNs) { return getRegionLocation(tableName, row, type, false, timeoutNs); } @@ -153,24 +202,31 @@ class AsyncRegionLocator { } void clearCache(TableName tableName) { - LOG.debug("Clear meta cache for {}", tableName); - if (tableName.equals(META_TABLE_NAME)) { - metaRegionLocator.clearCache(); - } else { - nonMetaRegionLocator.clearCache(tableName); - } + TraceUtil.trace(() -> { + LOG.debug("Clear meta cache for {}", tableName); + if (tableName.equals(META_TABLE_NAME)) { + metaRegionLocator.clearCache(); + } else { + nonMetaRegionLocator.clearCache(tableName); + } + }, () -> createTableSpan("AsyncRegionLocator.clearCache", tableName)); } void clearCache(ServerName serverName) { - LOG.debug("Clear meta cache for {}", serverName); - metaRegionLocator.clearCache(serverName); - nonMetaRegionLocator.clearCache(serverName); - conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearServer); + TraceUtil.trace(() -> { + LOG.debug("Clear meta cache for {}", serverName); + metaRegionLocator.clearCache(serverName); + nonMetaRegionLocator.clearCache(serverName); + conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearServer); + }, () -> createSpan("AsyncRegionLocator.clearCache").setAttribute(SERVER_NAME_KEY, + serverName.getServerName())); } void clearCache() { - metaRegionLocator.clearCache(); - nonMetaRegionLocator.clearCache(); + TraceUtil.trace(() -> { + metaRegionLocator.clearCache(); + nonMetaRegionLocator.clearCache(); + }, "AsyncRegionLocator.clearCache"); } AsyncNonMetaRegionLocator getNonMetaRegionLocator() { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java index 7473ed0..c7003e0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java @@ -70,6 +70,7 @@ public interface AsyncTable<C extends ScanResultConsumerBase> { * Gets the {@link AsyncTableRegionLocator} for this table. */ AsyncTableRegionLocator getRegionLocator(); + /** * Get timeout of each rpc request in this Table instance. It will be overridden by a more * specific rpc timeout config such as readRpcTimeout or writeRpcTimeout. @@ -184,7 +185,7 @@ public interface AsyncTable<C extends ScanResultConsumerBase> { * {@link CompletableFuture}. */ default CompletableFuture<Long> incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, - long amount) { + long amount) { return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL); } @@ -204,12 +205,12 @@ public interface AsyncTable<C extends ScanResultConsumerBase> { * {@link CompletableFuture}. */ default CompletableFuture<Long> incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, - long amount, Durability durability) { + long amount, Durability durability) { Preconditions.checkNotNull(row, "row is null"); Preconditions.checkNotNull(family, "family is null"); return increment( new Increment(row).addColumn(family, qualifier, amount).setDurability(durability)) - .thenApply(r -> Bytes.toLong(r.getValue(family, qualifier))); + .thenApply(r -> Bytes.toLong(r.getValue(family, qualifier))); } /** @@ -361,9 +362,8 @@ public interface AsyncTable<C extends ScanResultConsumerBase> { } /** - * checkAndMutate that atomically checks if a row matches the specified condition. If it does, - * it performs the specified action. - * + * checkAndMutate that atomically checks if a row matches the specified condition. If it does, it + * performs the specified action. * @param checkAndMutate The CheckAndMutate object. * @return A {@link CompletableFuture}s that represent the result for the CheckAndMutate. */ @@ -373,22 +373,19 @@ public interface AsyncTable<C extends ScanResultConsumerBase> { * Batch version of checkAndMutate. The specified CheckAndMutates are batched only in the sense * that they are sent to a RS in one RPC, but each CheckAndMutate operation is still executed * atomically (and thus, each may fail independently of others). - * * @param checkAndMutates The list of CheckAndMutate. - * @return A list of {@link CompletableFuture}s that represent the result for each - * CheckAndMutate. + * @return A list of {@link CompletableFuture}s that represent the result for each CheckAndMutate. */ - List<CompletableFuture<CheckAndMutateResult>> checkAndMutate( - List<CheckAndMutate> checkAndMutates); + List<CompletableFuture<CheckAndMutateResult>> + checkAndMutate(List<CheckAndMutate> checkAndMutates); /** * A simple version of batch checkAndMutate. It will fail if there are any failures. - * * @param checkAndMutates The list of rows to apply. * @return A {@link CompletableFuture} that wrapper the result list. */ - default CompletableFuture<List<CheckAndMutateResult>> checkAndMutateAll( - List<CheckAndMutate> checkAndMutates) { + default CompletableFuture<List<CheckAndMutateResult>> + checkAndMutateAll(List<CheckAndMutate> checkAndMutates) { return allOf(checkAndMutate(checkAndMutates)); } @@ -484,7 +481,7 @@ public interface AsyncTable<C extends ScanResultConsumerBase> { */ default List<CompletableFuture<Boolean>> exists(List<Get> gets) { return get(toCheckExistenceOnly(gets)).stream() - .<CompletableFuture<Boolean>> map(f -> f.thenApply(r -> r.getExists())).collect(toList()); + .<CompletableFuture<Boolean>> map(f -> f.thenApply(r -> r.getExists())).collect(toList()); } /** @@ -592,7 +589,7 @@ public interface AsyncTable<C extends ScanResultConsumerBase> { * @see ServiceCaller */ <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, - ServiceCaller<S, R> callable, byte[] row); + ServiceCaller<S, R> callable, byte[] row); /** * The callback when we want to execute a coprocessor call on a range of regions. @@ -731,5 +728,5 @@ public interface AsyncTable<C extends ScanResultConsumerBase> { * for more details. */ <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(Function<RpcChannel, S> stubMaker, - ServiceCaller<S, R> callable, CoprocessorCallback<R> callback); + ServiceCaller<S, R> callable, CoprocessorCallback<R> callback); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java index fa3ea1c..d5b275d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture; + import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -47,19 +49,21 @@ class AsyncTableRegionLocatorImpl implements AsyncTableRegionLocator { @Override public CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, int replicaId, - boolean reload) { + boolean reload) { return conn.getLocator().getRegionLocation(tableName, row, replicaId, RegionLocateType.CURRENT, reload, -1L); } @Override public CompletableFuture<List<HRegionLocation>> getAllRegionLocations() { - if (TableName.isMetaTableName(tableName)) { - return conn.registry.getMetaRegionLocations() - .thenApply(locs -> Arrays.asList(locs.getRegionLocations())); - } - return AsyncMetaTableAccessor.getTableHRegionLocations(conn.getTable(TableName.META_TABLE_NAME), - tableName); + return tracedFuture(() -> { + if (TableName.isMetaTableName(tableName)) { + return conn.registry.getMetaRegionLocations() + .thenApply(locs -> Arrays.asList(locs.getRegionLocations())); + } + return AsyncMetaTableAccessor + .getTableHRegionLocations(conn.getTable(TableName.META_TABLE_NAME), tableName); + }, getClass().getSimpleName() + ".getAllRegionLocations"); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java index f91b210..627e8d2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.AuthUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.yetus.audience.InterfaceAudience; @@ -280,30 +281,32 @@ public class ConnectionFactory { */ public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf, final User user) { - CompletableFuture<AsyncConnection> future = new CompletableFuture<>(); - ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(conf); - addListener(registry.getClusterId(), (clusterId, error) -> { - if (error != null) { - registry.close(); - future.completeExceptionally(error); - return; - } - if (clusterId == null) { - registry.close(); - future.completeExceptionally(new IOException("clusterid came back null")); - return; - } - Class<? extends AsyncConnection> clazz = conf.getClass(HBASE_CLIENT_ASYNC_CONNECTION_IMPL, - AsyncConnectionImpl.class, AsyncConnection.class); - try { - future.complete( - user.runAs((PrivilegedExceptionAction<? extends AsyncConnection>) () -> ReflectionUtils - .newInstance(clazz, conf, registry, clusterId, user))); - } catch (Exception e) { - registry.close(); - future.completeExceptionally(e); - } - }); - return future; + return TraceUtil.tracedFuture(() -> { + CompletableFuture<AsyncConnection> future = new CompletableFuture<>(); + ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(conf); + addListener(registry.getClusterId(), (clusterId, error) -> { + if (error != null) { + registry.close(); + future.completeExceptionally(error); + return; + } + if (clusterId == null) { + registry.close(); + future.completeExceptionally(new IOException("clusterid came back null")); + return; + } + Class<? extends AsyncConnection> clazz = conf.getClass(HBASE_CLIENT_ASYNC_CONNECTION_IMPL, + AsyncConnectionImpl.class, AsyncConnection.class); + try { + future.complete(user.runAs( + (PrivilegedExceptionAction<? extends AsyncConnection>) () -> ReflectionUtils + .newInstance(clazz, conf, registry, clusterId, user))); + } catch (Exception e) { + registry.close(); + future.completeExceptionally(e); + } + }); + return future; + }, ConnectionFactory.class.getSimpleName() + ".createAsyncConnection"); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index bed896e..f637e47 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -23,6 +23,8 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead; import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePut; import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePutsInRowMutations; +import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture; +import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFutures; import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import com.google.protobuf.RpcChannel; @@ -131,8 +133,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { } this.maxAttempts = builder.maxAttempts; this.startLogErrorsCnt = builder.startLogErrorsCnt; - this.defaultScannerCaching = tableName.isSystemTable() ? conn.connConf.getMetaScannerCaching() - : conn.connConf.getScannerCaching(); + this.defaultScannerCaching = tableName.isSystemTable() ? conn.connConf.getMetaScannerCaching() : + conn.connConf.getScannerCaching(); this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize(); } @@ -204,15 +206,15 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { } private static <REQ> CompletableFuture<Void> voidMutate(HBaseRpcController controller, - HRegionLocation loc, ClientService.Interface stub, REQ req, - Converter<MutateRequest, byte[], REQ> reqConvert) { + HRegionLocation loc, ClientService.Interface stub, REQ req, + Converter<MutateRequest, byte[], REQ> reqConvert) { return mutate(controller, loc, stub, req, reqConvert, (c, resp) -> { return null; }); } private static Result toResult(HBaseRpcController controller, MutateResponse resp) - throws IOException { + throws IOException { if (!resp.hasResult()) { return null; } @@ -225,9 +227,9 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { } private <REQ, RESP> CompletableFuture<RESP> noncedMutate(long nonceGroup, long nonce, - HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub, REQ req, - NoncedConverter<MutateRequest, byte[], REQ> reqConvert, - Converter<RESP, HBaseRpcController, MutateResponse> respConverter) { + HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub, REQ req, + NoncedConverter<MutateRequest, byte[], REQ> reqConvert, + Converter<RESP, HBaseRpcController, MutateResponse> respConverter) { return mutate(controller, loc, stub, req, (info, src) -> reqConvert.convert(info, src, nonceGroup, nonce), respConverter); } @@ -240,8 +242,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); } - private <T, R extends OperationWithAttributes & Row> SingleRequestCallerBuilder<T> newCaller( - R row, long rpcTimeoutNs) { + private <T, R extends OperationWithAttributes & Row> SingleRequestCallerBuilder<T> + newCaller(R row, long rpcTimeoutNs) { return newCaller(row.getRow(), row.getPriority(), rpcTimeoutNs); } @@ -256,50 +258,58 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { @Override public CompletableFuture<Result> get(Get get) { - return timelineConsistentRead(conn.getLocator(), tableName, get, get.getRow(), - RegionLocateType.CURRENT, replicaId -> get(get, replicaId), readRpcTimeoutNs, - conn.connConf.getPrimaryCallTimeoutNs(), retryTimer, conn.getConnectionMetrics()); + return tracedFuture( + () -> timelineConsistentRead(conn.getLocator(), tableName, get, get.getRow(), + RegionLocateType.CURRENT, replicaId -> get(get, replicaId), readRpcTimeoutNs, + conn.connConf.getPrimaryCallTimeoutNs(), retryTimer, conn.getConnectionMetrics()), + "AsyncTable.get", tableName); } @Override public CompletableFuture<Void> put(Put put) { validatePut(put, conn.connConf.getMaxKeyValueSize()); - return this.<Void, Put> newCaller(put, writeRpcTimeoutNs) + return tracedFuture(() -> this.<Void, Put> newCaller(put, writeRpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl.<Put> voidMutate(controller, loc, stub, put, RequestConverter::buildMutateRequest)) - .call(); + .call(), "AsyncTable.put", tableName); } @Override public CompletableFuture<Void> delete(Delete delete) { - return this.<Void, Delete> newCaller(delete, writeRpcTimeoutNs) - .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete> voidMutate(controller, loc, - stub, delete, RequestConverter::buildMutateRequest)) - .call(); + return tracedFuture( + () -> this.<Void, Delete> newCaller(delete, writeRpcTimeoutNs) + .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete> voidMutate(controller, loc, + stub, delete, RequestConverter::buildMutateRequest)) + .call(), + "AsyncTable.delete", tableName); } @Override public CompletableFuture<Result> append(Append append) { checkHasFamilies(append); - long nonceGroup = conn.getNonceGenerator().getNonceGroup(); - long nonce = conn.getNonceGenerator().newNonce(); - return this.<Result, Append> newCaller(append, rpcTimeoutNs) - .action( - (controller, loc, stub) -> this.<Append, Result> noncedMutate(nonceGroup, nonce, controller, - loc, stub, append, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult)) - .call(); + return tracedFuture(() -> { + long nonceGroup = conn.getNonceGenerator().getNonceGroup(); + long nonce = conn.getNonceGenerator().newNonce(); + return this.<Result, Append> newCaller(append, rpcTimeoutNs) + .action((controller, loc, stub) -> this.<Append, Result> noncedMutate(nonceGroup, nonce, + controller, loc, stub, append, RequestConverter::buildMutateRequest, + RawAsyncTableImpl::toResult)) + .call(); + }, "AsyncTable.append", tableName); } @Override public CompletableFuture<Result> increment(Increment increment) { checkHasFamilies(increment); - long nonceGroup = conn.getNonceGenerator().getNonceGroup(); - long nonce = conn.getNonceGenerator().newNonce(); - return this.<Result, Increment> newCaller(increment, rpcTimeoutNs) - .action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(nonceGroup, nonce, - controller, loc, stub, increment, RequestConverter::buildMutateRequest, - RawAsyncTableImpl::toResult)) - .call(); + return tracedFuture(() -> { + long nonceGroup = conn.getNonceGenerator().getNonceGroup(); + long nonce = conn.getNonceGenerator().newNonce(); + return this.<Result, Increment> newCaller(increment, rpcTimeoutNs) + .action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(nonceGroup, nonce, + controller, loc, stub, increment, RequestConverter::buildMutateRequest, + RawAsyncTableImpl::toResult)) + .call(); + }, "AsyncTable.increment", tableName); } private final class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder { @@ -357,39 +367,43 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { public CompletableFuture<Boolean> thenPut(Put put) { validatePut(put, conn.connConf.getMaxKeyValueSize()); preCheck(); - return RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs) - .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, - stub, put, - (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value, - null, timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE), - (c, r) -> r.getProcessed())) - .call(); + return tracedFuture( + () -> RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs) + .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, put, + (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value, + null, timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE), + (c, r) -> r.getProcessed())) + .call(), + "AsyncTable.CheckAndMutateBuilder.thenPut", tableName); } @Override public CompletableFuture<Boolean> thenDelete(Delete delete) { preCheck(); - return RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs) - .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, - loc, stub, delete, - (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value, - null, timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE), - (c, r) -> r.getProcessed())) - .call(); + return tracedFuture( + () -> RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs) + .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete, + (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value, + null, timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE), + (c, r) -> r.getProcessed())) + .call(), + "AsyncTable.CheckAndMutateBuilder.thenDelete", tableName); } @Override public CompletableFuture<Boolean> thenMutate(RowMutations mutation) { preCheck(); validatePutsInRowMutations(mutation, conn.connConf.getMaxKeyValueSize()); - return RawAsyncTableImpl.this.<Boolean> newCaller(row, mutation.getMaxPriority(), - rpcTimeoutNs) - .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, - loc, stub, mutation, - (rn, rm) -> RequestConverter.buildMultiRequest(rn, row, family, qualifier, op, value, - null, timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE), - CheckAndMutateResult::isSuccess)) - .call(); + return tracedFuture( + () -> RawAsyncTableImpl.this + .<Boolean> newCaller(row, mutation.getMaxPriority(), rpcTimeoutNs) + .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, loc, stub, + mutation, + (rn, rm) -> RequestConverter.buildMultiRequest(rn, row, family, qualifier, op, value, + null, timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE), + CheckAndMutateResult::isSuccess)) + .call(), + "AsyncTable.CheckAndMutateBuilder.thenMutate", tableName); } } @@ -421,37 +435,42 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { @Override public CompletableFuture<Boolean> thenPut(Put put) { validatePut(put, conn.connConf.getMaxKeyValueSize()); - return RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs) + return tracedFuture( + () -> RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, put, (rn, p) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null, filter, timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE), (c, r) -> r.getProcessed())) - .call(); + .call(), + "AsyncTable.CheckAndMutateWithFilterBuilder.thenPut", tableName); } @Override public CompletableFuture<Boolean> thenDelete(Delete delete) { - return RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs) - .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, - loc, stub, delete, - (rn, d) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null, - filter, timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE), - (c, r) -> r.getProcessed())) - .call(); + return tracedFuture( + () -> RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs) + .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete, + (rn, d) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null, filter, + timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE), + (c, r) -> r.getProcessed())) + .call(), + "AsyncTable.CheckAndMutateWithFilterBuilder.thenDelete", tableName); } @Override public CompletableFuture<Boolean> thenMutate(RowMutations mutation) { validatePutsInRowMutations(mutation, conn.connConf.getMaxKeyValueSize()); - return RawAsyncTableImpl.this.<Boolean> newCaller(row, mutation.getMaxPriority(), - rpcTimeoutNs) - .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, - loc, stub, mutation, - (rn, rm) -> RequestConverter.buildMultiRequest(rn, row, null, null, null, null, - filter, timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE), - CheckAndMutateResult::isSuccess)) - .call(); + return tracedFuture( + () -> RawAsyncTableImpl.this + .<Boolean> newCaller(row, mutation.getMaxPriority(), rpcTimeoutNs) + .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, loc, stub, + mutation, + (rn, rm) -> RequestConverter.buildMultiRequest(rn, row, null, null, null, null, filter, + timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE), + CheckAndMutateResult::isSuccess)) + .call(), + "AsyncTable.CheckAndMutateWithFilterBuilder.thenMutate", tableName); } } @@ -462,63 +481,69 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { @Override public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate checkAndMutate) { - if (checkAndMutate.getAction() instanceof Put || checkAndMutate.getAction() instanceof Delete - || checkAndMutate.getAction() instanceof Increment - || checkAndMutate.getAction() instanceof Append) { - Mutation mutation = (Mutation) checkAndMutate.getAction(); - if (mutation instanceof Put) { - validatePut((Put) mutation, conn.connConf.getMaxKeyValueSize()); + return tracedFuture(() -> { + if (checkAndMutate.getAction() instanceof Put || + checkAndMutate.getAction() instanceof Delete || + checkAndMutate.getAction() instanceof Increment || + checkAndMutate.getAction() instanceof Append) { + Mutation mutation = (Mutation) checkAndMutate.getAction(); + if (mutation instanceof Put) { + validatePut((Put) mutation, conn.connConf.getMaxKeyValueSize()); + } + long nonceGroup = conn.getNonceGenerator().getNonceGroup(); + long nonce = conn.getNonceGenerator().newNonce(); + return RawAsyncTableImpl.this + .<CheckAndMutateResult> newCaller(checkAndMutate.getRow(), mutation.getPriority(), + rpcTimeoutNs) + .action( + (controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, mutation, + (rn, m) -> RequestConverter.buildMutateRequest(rn, checkAndMutate.getRow(), + checkAndMutate.getFamily(), checkAndMutate.getQualifier(), + checkAndMutate.getCompareOp(), checkAndMutate.getValue(), + checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), m, nonceGroup, nonce), + (c, r) -> ResponseConverter.getCheckAndMutateResult(r, c.cellScanner()))) + .call(); + } else if (checkAndMutate.getAction() instanceof RowMutations) { + RowMutations rowMutations = (RowMutations) checkAndMutate.getAction(); + validatePutsInRowMutations(rowMutations, conn.connConf.getMaxKeyValueSize()); + long nonceGroup = conn.getNonceGenerator().getNonceGroup(); + long nonce = conn.getNonceGenerator().newNonce(); + return RawAsyncTableImpl.this + .<CheckAndMutateResult> newCaller(checkAndMutate.getRow(), rowMutations.getMaxPriority(), + rpcTimeoutNs) + .action((controller, loc, stub) -> RawAsyncTableImpl.this + .<CheckAndMutateResult, CheckAndMutateResult> mutateRow(controller, loc, stub, + rowMutations, + (rn, rm) -> RequestConverter.buildMultiRequest(rn, checkAndMutate.getRow(), + checkAndMutate.getFamily(), checkAndMutate.getQualifier(), + checkAndMutate.getCompareOp(), checkAndMutate.getValue(), + checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), rm, nonceGroup, nonce), + resp -> resp)) + .call(); + } else { + CompletableFuture<CheckAndMutateResult> future = new CompletableFuture<>(); + future.completeExceptionally(new DoNotRetryIOException( + "CheckAndMutate doesn't support " + checkAndMutate.getAction().getClass().getName())); + return future; } - long nonceGroup = conn.getNonceGenerator().getNonceGroup(); - long nonce = conn.getNonceGenerator().newNonce(); - return RawAsyncTableImpl.this.<CheckAndMutateResult> newCaller(checkAndMutate.getRow(), - mutation.getPriority(), rpcTimeoutNs) - .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, - loc, stub, mutation, - (rn, m) -> RequestConverter.buildMutateRequest(rn, checkAndMutate.getRow(), - checkAndMutate.getFamily(), checkAndMutate.getQualifier(), - checkAndMutate.getCompareOp(), checkAndMutate.getValue(), checkAndMutate.getFilter(), - checkAndMutate.getTimeRange(), m, nonceGroup, nonce), - (c, r) -> ResponseConverter.getCheckAndMutateResult(r, c.cellScanner()))) - .call(); - } else if (checkAndMutate.getAction() instanceof RowMutations) { - RowMutations rowMutations = (RowMutations) checkAndMutate.getAction(); - validatePutsInRowMutations(rowMutations, conn.connConf.getMaxKeyValueSize()); - long nonceGroup = conn.getNonceGenerator().getNonceGroup(); - long nonce = conn.getNonceGenerator().newNonce(); - return RawAsyncTableImpl.this.<CheckAndMutateResult> newCaller(checkAndMutate.getRow(), - rowMutations.getMaxPriority(), rpcTimeoutNs) - .action((controller, loc, stub) -> - RawAsyncTableImpl.this.<CheckAndMutateResult, CheckAndMutateResult> mutateRow( - controller, loc, stub, rowMutations, - (rn, rm) -> RequestConverter.buildMultiRequest(rn, checkAndMutate.getRow(), - checkAndMutate.getFamily(), checkAndMutate.getQualifier(), - checkAndMutate.getCompareOp(), checkAndMutate.getValue(), checkAndMutate.getFilter(), - checkAndMutate.getTimeRange(), rm, nonceGroup, nonce), - resp -> resp)) - .call(); - } else { - CompletableFuture<CheckAndMutateResult> future = new CompletableFuture<>(); - future.completeExceptionally(new DoNotRetryIOException( - "CheckAndMutate doesn't support " + checkAndMutate.getAction().getClass().getName())); - return future; - } + }, "AsyncTable.checkAndMutate", tableName); } @Override - public List<CompletableFuture<CheckAndMutateResult>> checkAndMutate( - List<CheckAndMutate> checkAndMutates) { - return batch(checkAndMutates, rpcTimeoutNs).stream() - .map(f -> f.thenApply(r -> (CheckAndMutateResult) r)).collect(toList()); + public List<CompletableFuture<CheckAndMutateResult>> + checkAndMutate(List<CheckAndMutate> checkAndMutates) { + return tracedFutures( + () -> batch(checkAndMutates, rpcTimeoutNs).stream() + .map(f -> f.thenApply(r -> (CheckAndMutateResult) r)).collect(toList()), + "AsyncTable.checkAndMutateList", tableName); } // We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse, // so here I write a new method as I do not want to change the abstraction of call method. @SuppressWarnings("unchecked") private <RES, RESP> CompletableFuture<RESP> mutateRow(HBaseRpcController controller, - HRegionLocation loc, ClientService.Interface stub, RowMutations mutation, - Converter<MultiRequest, byte[], RowMutations> reqConvert, - Function<RES, RESP> respConverter) { + HRegionLocation loc, ClientService.Interface stub, RowMutations mutation, + Converter<MultiRequest, byte[], RowMutations> reqConvert, Function<RES, RESP> respConverter) { CompletableFuture<RESP> future = new CompletableFuture<>(); try { byte[] regionName = loc.getRegion().getRegionName(); @@ -537,12 +562,12 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { loc.getServerName(), multiResp); Throwable ex = multiResp.getException(regionName); if (ex != null) { - future.completeExceptionally(ex instanceof IOException ? ex - : new IOException( + future.completeExceptionally(ex instanceof IOException ? ex : + new IOException( "Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()), ex)); } else { - future.complete(respConverter - .apply((RES) multiResp.getResults().get(regionName).result.get(0))); + future.complete( + respConverter.apply((RES) multiResp.getResults().get(regionName).result.get(0))); } } catch (IOException e) { future.completeExceptionally(e); @@ -561,12 +586,14 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize()); long nonceGroup = conn.getNonceGenerator().getNonceGroup(); long nonce = conn.getNonceGenerator().newNonce(); - return this.<Result> newCaller(mutations.getRow(), mutations.getMaxPriority(), - writeRpcTimeoutNs).action((controller, loc, stub) -> - this.<Result, Result> mutateRow(controller, loc, stub, mutations, - (rn, rm) -> RequestConverter.buildMultiRequest(rn, rm, nonceGroup, nonce), + return tracedFuture( + () -> this + .<Result> newCaller(mutations.getRow(), mutations.getMaxPriority(), writeRpcTimeoutNs) + .action((controller, loc, stub) -> this.<Result, Result> mutateRow(controller, loc, stub, + mutations, (rn, rm) -> RequestConverter.buildMultiRequest(rn, rm, nonceGroup, nonce), resp -> resp)) - .call(); + .call(), + "AsyncTable.mutateRow", tableName); } private Scan setDefaultScanConfig(Scan scan) { @@ -602,46 +629,48 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { @Override public CompletableFuture<List<Result>> scanAll(Scan scan) { - CompletableFuture<List<Result>> future = new CompletableFuture<>(); - List<Result> scanResults = new ArrayList<>(); - scan(scan, new AdvancedScanResultConsumer() { + return tracedFuture(() -> { + CompletableFuture<List<Result>> future = new CompletableFuture<>(); + List<Result> scanResults = new ArrayList<>(); + scan(scan, new AdvancedScanResultConsumer() { - @Override - public void onNext(Result[] results, ScanController controller) { - scanResults.addAll(Arrays.asList(results)); - } + @Override + public void onNext(Result[] results, ScanController controller) { + scanResults.addAll(Arrays.asList(results)); + } - @Override - public void onError(Throwable error) { - future.completeExceptionally(error); - } + @Override + public void onError(Throwable error) { + future.completeExceptionally(error); + } - @Override - public void onComplete() { - future.complete(scanResults); - } - }); - return future; + @Override + public void onComplete() { + future.complete(scanResults); + } + }); + return future; + }, "AsyncTable.scanAll", tableName); } @Override public List<CompletableFuture<Result>> get(List<Get> gets) { - return batch(gets, readRpcTimeoutNs); + return tracedFutures(() -> batch(gets, readRpcTimeoutNs), "AsyncTable.getList", tableName); } @Override public List<CompletableFuture<Void>> put(List<Put> puts) { - return voidMutate(puts); + return tracedFutures(() -> voidMutate(puts), "AsyncTable.putList", tableName); } @Override public List<CompletableFuture<Void>> delete(List<Delete> deletes) { - return voidMutate(deletes); + return tracedFutures(() -> voidMutate(deletes), "AsyncTable.deleteList", tableName); } @Override public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) { - return batch(actions, rpcTimeoutNs); + return tracedFutures(() -> batch(actions, rpcTimeoutNs), "AsyncTable.batch", tableName); } private List<CompletableFuture<Void>> voidMutate(List<? extends Row> actions) { @@ -698,7 +727,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { } private <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, - ServiceCaller<S, R> callable, RegionInfo region, byte[] row) { + ServiceCaller<S, R> callable, RegionInfo region, byte[] row) { RegionCoprocessorRpcChannelImpl channel = new RegionCoprocessorRpcChannelImpl(conn, tableName, region, row, rpcTimeoutNs, operationTimeoutNs); S stub = stubMaker.apply(channel); @@ -716,7 +745,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { @Override public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, - ServiceCaller<S, R> callable, byte[] row) { + ServiceCaller<S, R> callable, byte[] row) { return coprocessorService(stubMaker, callable, null, row); } @@ -738,9 +767,9 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { } private <S, R> void onLocateComplete(Function<RpcChannel, S> stubMaker, - ServiceCaller<S, R> callable, CoprocessorCallback<R> callback, List<HRegionLocation> locs, - byte[] endKey, boolean endKeyInclusive, AtomicBoolean locateFinished, - AtomicInteger unfinishedRequest, HRegionLocation loc, Throwable error) { + ServiceCaller<S, R> callable, CoprocessorCallback<R> callback, List<HRegionLocation> locs, + byte[] endKey, boolean endKeyInclusive, AtomicBoolean locateFinished, + AtomicInteger unfinishedRequest, HRegionLocation loc, Throwable error) { if (error != null) { callback.onError(error); return; @@ -769,7 +798,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { } private final class CoprocessorServiceBuilderImpl<S, R> - implements CoprocessorServiceBuilder<S, R> { + implements CoprocessorServiceBuilder<S, R> { private final Function<RpcChannel, S> stubMaker; @@ -786,7 +815,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { private boolean endKeyInclusive; public CoprocessorServiceBuilderImpl(Function<RpcChannel, S> stubMaker, - ServiceCaller<S, R> callable, CoprocessorCallback<R> callback) { + ServiceCaller<S, R> callable, CoprocessorCallback<R> callback) { this.stubMaker = Preconditions.checkNotNull(stubMaker, "stubMaker is null"); this.callable = Preconditions.checkNotNull(callable, "callable is null"); this.callback = Preconditions.checkNotNull(callback, "callback is null"); @@ -823,8 +852,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { @Override public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService( - Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable, - CoprocessorCallback<R> callback) { + Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable, + CoprocessorCallback<R> callback) { return new CoprocessorServiceBuilderImpl<>(stubMaker, callable, callback); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java index 9117fef..b671095 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java @@ -393,10 +393,11 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC } private Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc, - final Message param, Message returnType, final User ticket, - final Address addr, final RpcCallback<Message> callback) { - Span span = TraceUtil.getGlobalTracer().spanBuilder("RpcClient.callMethod." + md.getFullName()) - .startSpan(); + final Message param, Message returnType, final User ticket, final Address addr, + final RpcCallback<Message> callback) { + Span span = TraceUtil.createSpan("RpcClient.callMethod") + .setAttribute(TraceUtil.RPC_SERVICE_KEY, md.getService().getName()) + .setAttribute(TraceUtil.RPC_METHOD_KEY, md.getName()); try (Scope scope = span.makeCurrent()) { final MetricsConnection.CallStats cs = MetricsConnection.newCallStats(); cs.setStartTime(EnvironmentEdgeManager.currentTime()); diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTracing.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTracing.java new file mode 100644 index 0000000..708ae4c --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTracing.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; + +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; +import io.opentelemetry.sdk.trace.data.SpanData; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.trace.TraceUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.common.io.Closeables; + +@Category({ ClientTests.class, MediumTests.class }) +public class TestAsyncRegionLocatorTracing { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestAsyncRegionLocatorTracing.class); + + private static Configuration CONF = HBaseConfiguration.create(); + + private AsyncConnectionImpl conn; + + private RegionLocations locs; + + @Rule + public OpenTelemetryRule traceRule = OpenTelemetryRule.create(); + + @Before + public void setUp() throws IOException { + RegionInfo metaRegionInfo = RegionInfoBuilder.newBuilder(TableName.META_TABLE_NAME).build(); + locs = new RegionLocations( + new HRegionLocation(metaRegionInfo, + ServerName.valueOf("127.0.0.1", 12345, System.currentTimeMillis())), + new HRegionLocation(RegionReplicaUtil.getRegionInfoForReplica(metaRegionInfo, 1), + ServerName.valueOf("127.0.0.2", 12345, System.currentTimeMillis())), + new HRegionLocation(RegionReplicaUtil.getRegionInfoForReplica(metaRegionInfo, 2), + ServerName.valueOf("127.0.0.3", 12345, System.currentTimeMillis()))); + conn = new AsyncConnectionImpl(CONF, new DoNothingConnectionRegistry(CONF) { + + @Override + public CompletableFuture<RegionLocations> getMetaRegionLocations() { + return CompletableFuture.completedFuture(locs); + } + }, "test", UserProvider.instantiate(CONF).getCurrent()); + } + + @After + public void tearDown() throws IOException { + Closeables.close(conn, true); + } + + private SpanData waitSpan(String name) { + Waiter.waitFor(CONF, 1000, + () -> traceRule.getSpans().stream().map(SpanData::getName).anyMatch(s -> s.equals(name))); + return traceRule.getSpans().stream().filter(s -> s.getName().equals(name)).findFirst().get(); + } + + @Test + public void testClearCache() { + conn.getLocator().clearCache(); + SpanData span = waitSpan("AsyncRegionLocator.clearCache"); + assertEquals(StatusCode.OK, span.getStatus().getStatusCode()); + } + + @Test + public void testClearCacheServerName() { + ServerName sn = ServerName.valueOf("127.0.0.1", 12345, System.currentTimeMillis()); + conn.getLocator().clearCache(sn); + SpanData span = waitSpan("AsyncRegionLocator.clearCache"); + assertEquals(StatusCode.OK, span.getStatus().getStatusCode()); + assertEquals(sn.toString(), span.getAttributes().get(TraceUtil.SERVER_NAME_KEY)); + } + + @Test + public void testClearCacheTableName() { + conn.getLocator().clearCache(TableName.META_TABLE_NAME); + SpanData span = waitSpan("AsyncRegionLocator.clearCache"); + assertEquals(StatusCode.OK, span.getStatus().getStatusCode()); + assertEquals(TableName.META_TABLE_NAME.getNamespaceAsString(), + span.getAttributes().get(TraceUtil.NAMESPACE_KEY)); + assertEquals(TableName.META_TABLE_NAME.getNameAsString(), + span.getAttributes().get(TraceUtil.TABLE_KEY)); + } + + @Test + public void testGetRegionLocation() { + conn.getLocator().getRegionLocation(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW, + RegionLocateType.CURRENT, TimeUnit.SECONDS.toNanos(1)).join(); + SpanData span = waitSpan("AsyncRegionLocator.getRegionLocation"); + assertEquals(StatusCode.OK, span.getStatus().getStatusCode()); + assertEquals(TableName.META_TABLE_NAME.getNamespaceAsString(), + span.getAttributes().get(TraceUtil.NAMESPACE_KEY)); + assertEquals(TableName.META_TABLE_NAME.getNameAsString(), + span.getAttributes().get(TraceUtil.TABLE_KEY)); + List<String> regionNames = span.getAttributes().get(TraceUtil.REGION_NAMES_KEY); + assertEquals(1, regionNames.size()); + assertEquals(locs.getDefaultRegionLocation().getRegion().getRegionNameAsString(), + regionNames.get(0)); + } + + @Test + public void testGetRegionLocations() { + conn.getLocator().getRegionLocations(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW, + RegionLocateType.CURRENT, false, TimeUnit.SECONDS.toNanos(1)).join(); + SpanData span = waitSpan("AsyncRegionLocator.getRegionLocations"); + assertEquals(StatusCode.OK, span.getStatus().getStatusCode()); + assertEquals(TableName.META_TABLE_NAME.getNamespaceAsString(), + span.getAttributes().get(TraceUtil.NAMESPACE_KEY)); + assertEquals(TableName.META_TABLE_NAME.getNameAsString(), + span.getAttributes().get(TraceUtil.TABLE_KEY)); + List<String> regionNames = span.getAttributes().get(TraceUtil.REGION_NAMES_KEY); + assertEquals(3, regionNames.size()); + for (int i = 0; i < 3; i++) { + assertEquals(locs.getRegionLocation(i).getRegion().getRegionNameAsString(), + regionNames.get(i)); + } + } +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java new file mode 100644 index 0000000..07cdf0e --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java @@ -0,0 +1,417 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; + +import io.opentelemetry.api.trace.Span.Kind; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; +import io.opentelemetry.sdk.trace.data.SpanData; +import java.io.IOException; +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellBuilderFactory; +import org.apache.hadoop.hbase.CellBuilderType; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.trace.TraceUtil; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import org.apache.hbase.thirdparty.com.google.common.io.Closeables; +import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue.QualifierValue; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; + +@Category({ ClientTests.class, MediumTests.class }) +public class TestAsyncTableTracing { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestAsyncTableTracing.class); + + private static Configuration CONF = HBaseConfiguration.create(); + + private ClientService.Interface stub; + + private AsyncConnection conn; + + private AsyncTable<?> table; + + @Rule + public OpenTelemetryRule traceRule = OpenTelemetryRule.create(); + + @Before + public void setUp() throws IOException { + stub = mock(ClientService.Interface.class); + AtomicInteger scanNextCalled = new AtomicInteger(0); + doAnswer(new Answer<Void>() { + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + ScanRequest req = invocation.getArgument(1); + RpcCallback<ScanResponse> done = invocation.getArgument(2); + if (!req.hasScannerId()) { + done.run(ScanResponse.newBuilder().setScannerId(1).setTtl(800) + .setMoreResultsInRegion(true).setMoreResults(true).build()); + } else { + if (req.hasCloseScanner() && req.getCloseScanner()) { + done.run(ScanResponse.getDefaultInstance()); + } else { + Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) + .setType(Cell.Type.Put).setRow(Bytes.toBytes(scanNextCalled.incrementAndGet())) + .setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("cq")) + .setValue(Bytes.toBytes("v")).build(); + Result result = Result.create(Arrays.asList(cell)); + ScanResponse.Builder builder = ScanResponse.newBuilder().setScannerId(1).setTtl(800) + .addResults(ProtobufUtil.toResult(result)); + if (req.getLimitOfRows() == 1) { + builder.setMoreResultsInRegion(false).setMoreResults(false); + } else { + builder.setMoreResultsInRegion(true).setMoreResults(true); + } + ForkJoinPool.commonPool().execute(() -> done.run(builder.build())); + } + } + return null; + } + }).when(stub).scan(any(HBaseRpcController.class), any(ScanRequest.class), any()); + doAnswer(new Answer<Void>() { + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + ClientProtos.MultiResponse resp = + ClientProtos.MultiResponse.newBuilder() + .addRegionActionResult(RegionActionResult.newBuilder().addResultOrException( + ResultOrException.newBuilder().setResult(ProtobufUtil.toResult(new Result())))) + .build(); + RpcCallback<ClientProtos.MultiResponse> done = invocation.getArgument(2); + ForkJoinPool.commonPool().execute(() -> done.run(resp)); + return null; + } + }).when(stub).multi(any(HBaseRpcController.class), any(ClientProtos.MultiRequest.class), any()); + doAnswer(new Answer<Void>() { + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + MutationProto req = ((MutateRequest) invocation.getArgument(1)).getMutation(); + MutateResponse resp; + switch (req.getMutateType()) { + case INCREMENT: + ColumnValue value = req.getColumnValue(0); + QualifierValue qvalue = value.getQualifierValue(0); + Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) + .setType(Cell.Type.Put).setRow(req.getRow().toByteArray()) + .setFamily(value.getFamily().toByteArray()) + .setQualifier(qvalue.getQualifier().toByteArray()) + .setValue(qvalue.getValue().toByteArray()).build(); + resp = MutateResponse.newBuilder() + .setResult(ProtobufUtil.toResult(Result.create(Arrays.asList(cell)))).build(); + break; + default: + resp = MutateResponse.getDefaultInstance(); + break; + } + RpcCallback<MutateResponse> done = invocation.getArgument(2); + ForkJoinPool.commonPool().execute(() -> done.run(resp)); + return null; + } + }).when(stub).mutate(any(HBaseRpcController.class), any(MutateRequest.class), any()); + doAnswer(new Answer<Void>() { + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + RpcCallback<GetResponse> done = invocation.getArgument(2); + ForkJoinPool.commonPool().execute(() -> done.run(GetResponse.getDefaultInstance())); + return null; + } + }).when(stub).get(any(HBaseRpcController.class), any(GetRequest.class), any()); + conn = new AsyncConnectionImpl(CONF, new DoNothingConnectionRegistry(CONF), "test", + UserProvider.instantiate(CONF).getCurrent()) { + + @Override + AsyncRegionLocator getLocator() { + AsyncRegionLocator locator = mock(AsyncRegionLocator.class); + Answer<CompletableFuture<HRegionLocation>> answer = + new Answer<CompletableFuture<HRegionLocation>>() { + + @Override + public CompletableFuture<HRegionLocation> answer(InvocationOnMock invocation) + throws Throwable { + TableName tableName = invocation.getArgument(0); + RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build(); + ServerName serverName = ServerName.valueOf("rs", 16010, 12345); + HRegionLocation loc = new HRegionLocation(info, serverName); + return CompletableFuture.completedFuture(loc); + } + }; + doAnswer(answer).when(locator).getRegionLocation(any(TableName.class), any(byte[].class), + any(RegionLocateType.class), anyLong()); + doAnswer(answer).when(locator).getRegionLocation(any(TableName.class), any(byte[].class), + anyInt(), any(RegionLocateType.class), anyLong()); + return locator; + } + + @Override + ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException { + return stub; + } + }; + table = conn.getTable(TableName.valueOf("table"), ForkJoinPool.commonPool()); + } + + @After + public void tearDown() throws IOException { + Closeables.close(conn, true); + } + + private void assertTrace(String methodName) { + Waiter.waitFor(CONF, 1000, + () -> traceRule.getSpans().stream() + .anyMatch(span -> span.getName().equals("AsyncTable." + methodName) && + span.getKind() == Kind.INTERNAL && span.hasEnded())); + SpanData data = traceRule.getSpans().stream() + .filter(s -> s.getName().equals("AsyncTable." + methodName)).findFirst().get(); + assertEquals(StatusCode.OK, data.getStatus().getStatusCode()); + TableName tableName = table.getName(); + assertEquals(tableName.getNamespaceAsString(), + data.getAttributes().get(TraceUtil.NAMESPACE_KEY)); + assertEquals(tableName.getNameAsString(), data.getAttributes().get(TraceUtil.TABLE_KEY)); + } + + @Test + public void testExists() { + table.exists(new Get(Bytes.toBytes(0))).join(); + assertTrace("get"); + } + + @Test + public void testGet() { + table.get(new Get(Bytes.toBytes(0))).join(); + assertTrace("get"); + } + + @Test + public void testPut() { + table.put(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), + Bytes.toBytes("v"))).join(); + assertTrace("put"); + } + + @Test + public void testDelete() { + table.delete(new Delete(Bytes.toBytes(0))).join(); + assertTrace("delete"); + } + + @Test + public void testAppend() { + table.append(new Append(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), + Bytes.toBytes("v"))).join(); + assertTrace("append"); + } + + @Test + public void testIncrement() { + table + .increment( + new Increment(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1)) + .join(); + assertTrace("increment"); + } + + @Test + public void testIncrementColumnValue1() { + table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1) + .join(); + assertTrace("increment"); + } + + @Test + public void testIncrementColumnValue2() { + table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1, + Durability.ASYNC_WAL).join(); + assertTrace("increment"); + } + + @Test + public void testCheckAndMutate() { + table.checkAndMutate(CheckAndMutate.newBuilder(Bytes.toBytes(0)) + .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")) + .build(new Delete(Bytes.toBytes(0)))).join(); + assertTrace("checkAndMutate"); + } + + @Test + public void testCheckAndMutateList() { + CompletableFuture + .allOf(table.checkAndMutate(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0)) + .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")) + .build(new Delete(Bytes.toBytes(0))))).toArray(new CompletableFuture[0])) + .join(); + assertTrace("checkAndMutateList"); + } + + @Test + public void testCheckAndMutateAll() { + table.checkAndMutateAll(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0)) + .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")) + .build(new Delete(Bytes.toBytes(0))))).join(); + assertTrace("checkAndMutateList"); + } + + @Test + public void testMutateRow() throws Exception { + byte[] row = Bytes.toBytes(0); + RowMutations mutation = new RowMutations(row); + mutation.add(new Delete(row)); + table.mutateRow(mutation).get(); + assertTrace("mutateRow"); + } + + @Test + public void testScanAll() throws IOException { + table.scanAll(new Scan().setCaching(1).setMaxResultSize(1).setLimit(1)).join(); + assertTrace("scanAll"); + } + + @Test + public void testExistsList() { + CompletableFuture + .allOf( + table.exists(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0])) + .join(); + assertTrace("getList"); + } + + @Test + public void testExistsAll() { + table.existsAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join(); + assertTrace("getList"); + } + + @Test + public void testGetList() { + CompletableFuture + .allOf(table.get(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0])) + .join(); + assertTrace("getList"); + } + + @Test + public void testGetAll() { + table.getAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join(); + assertTrace("getList"); + } + + @Test + public void testPutList() { + CompletableFuture + .allOf(table.put(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), + Bytes.toBytes("cq"), Bytes.toBytes("v")))).toArray(new CompletableFuture[0])) + .join(); + assertTrace("putList"); + } + + @Test + public void testPutAll() { + table.putAll(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), + Bytes.toBytes("cq"), Bytes.toBytes("v")))).join(); + assertTrace("putList"); + } + + @Test + public void testDeleteList() { + CompletableFuture + .allOf( + table.delete(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0])) + .join(); + assertTrace("deleteList"); + } + + @Test + public void testDeleteAll() { + table.deleteAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join(); + assertTrace("deleteList"); + } + + @Test + public void testBatch() { + CompletableFuture + .allOf( + table.batch(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0])) + .join(); + assertTrace("batch"); + } + + @Test + public void testBatchAll() { + table.batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join(); + assertTrace("batch"); + } + + @Test + public void testConnClose() throws IOException { + conn.close(); + Waiter.waitFor(CONF, 1000, + () -> traceRule.getSpans().stream() + .anyMatch(span -> span.getName().equals("AsyncConnection.close") && + span.getKind() == Kind.INTERNAL && span.hasEnded())); + SpanData data = traceRule.getSpans().stream() + .filter(s -> s.getName().equals("AsyncConnection.close")).findFirst().get(); + assertEquals(StatusCode.OK, data.getStatus().getStatusCode()); + } +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java index 768de9c..d0da071 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java @@ -18,7 +18,19 @@ package org.apache.hadoop.hbase.trace; import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Span.Kind; +import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.api.trace.attributes.SemanticAttributes; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private @@ -26,10 +38,132 @@ public final class TraceUtil { private static final String INSTRUMENTATION_NAME = "io.opentelemetry.contrib.hbase"; + public static final AttributeKey<String> NAMESPACE_KEY = SemanticAttributes.DB_HBASE_NAMESPACE; + + public static final AttributeKey<String> TABLE_KEY = AttributeKey.stringKey("db.hbase.table"); + + public static final AttributeKey<List<String>> REGION_NAMES_KEY = + AttributeKey.stringArrayKey("db.hbase.regions"); + + public static final AttributeKey<String> RPC_SERVICE_KEY = + AttributeKey.stringKey("db.hbase.rpc.service"); + + public static final AttributeKey<String> RPC_METHOD_KEY = + AttributeKey.stringKey("db.hbase.rpc.method"); + + public static final AttributeKey<String> SERVER_NAME_KEY = + AttributeKey.stringKey("db.hbase.server.name"); + private TraceUtil() { } public static Tracer getGlobalTracer() { return GlobalOpenTelemetry.getTracer(INSTRUMENTATION_NAME); } + + /** + * Create a {@link Kind#INTERNAL} span. + */ + public static Span createSpan(String name) { + return createSpan(name, Kind.INTERNAL); + } + + /** + * Create a {@link Kind#INTERNAL} span and set table related attributes. + */ + public static Span createTableSpan(String spanName, TableName tableName) { + return createSpan(spanName).setAttribute(NAMESPACE_KEY, tableName.getNamespaceAsString()) + .setAttribute(TABLE_KEY, tableName.getNameAsString()); + } + + /** + * Create a span with the given {@code kind}. Notice that, OpenTelemetry only expects one + * {@link Kind#CLIENT} span and one {@link Kind#SERVER} span for a traced request, so use this + * with caution when you want to create spans with kind other than {@link Kind#INTERNAL}. + */ + private static Span createSpan(String name, Kind kind) { + return getGlobalTracer().spanBuilder(name).setSpanKind(kind).startSpan(); + } + + /** + * Create a span which parent is from remote, i.e, passed through rpc. + * </p> + * We will set the kind of the returned span to {@link Kind#SERVER}, as this should be the top + * most span at server side. + */ + public static Span createRemoteSpan(String name, Context ctx) { + return getGlobalTracer().spanBuilder(name).setParent(ctx).setSpanKind(Kind.SERVER).startSpan(); + } + + /** + * Trace an asynchronous operation for a table. + */ + public static <T> CompletableFuture<T> tracedFuture(Supplier<CompletableFuture<T>> action, + String spanName, TableName tableName) { + Span span = createTableSpan(spanName, tableName); + try (Scope scope = span.makeCurrent()) { + CompletableFuture<T> future = action.get(); + endSpan(future, span); + return future; + } + } + + /** + * Trace an asynchronous operation. + */ + public static <T> CompletableFuture<T> tracedFuture(Supplier<CompletableFuture<T>> action, + String spanName) { + Span span = createSpan(spanName); + try (Scope scope = span.makeCurrent()) { + CompletableFuture<T> future = action.get(); + endSpan(future, span); + return future; + } + } + + /** + * Trace an asynchronous operation, and finish the create {@link Span} when all the given + * {@code futures} are completed. + */ + public static <T> List<CompletableFuture<T>> tracedFutures( + Supplier<List<CompletableFuture<T>>> action, String spanName, TableName tableName) { + Span span = createTableSpan(spanName, tableName); + try (Scope scope = span.makeCurrent()) { + List<CompletableFuture<T>> futures = action.get(); + endSpan(CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])), span); + return futures; + } + } + + /** + * Finish the {@code span} when the given {@code future} is completed. + */ + private static void endSpan(CompletableFuture<?> future, Span span) { + FutureUtils.addListener(future, (resp, error) -> { + if (error != null) { + span.recordException(error); + span.setStatus(StatusCode.ERROR); + } else { + span.setStatus(StatusCode.OK); + } + span.end(); + }); + } + + public static void trace(Runnable action, String spanName) { + trace(action, () -> createSpan(spanName)); + } + + public static void trace(Runnable action, Supplier<Span> creator) { + Span span = creator.get(); + try (Scope scope = span.makeCurrent()) { + action.run(); + span.setStatus(StatusCode.OK); + } catch (Throwable e) { + span.recordException(e); + span.setStatus(StatusCode.ERROR); + } finally { + span.end(); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java index 203f079..7cc1d2b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java @@ -122,9 +122,10 @@ public class CallRunner { RpcServer.CurCall.set(call); String serviceName = getServiceName(); String methodName = getMethodName(); - String traceString = serviceName + "." + methodName; - Span span = TraceUtil.getGlobalTracer().spanBuilder(traceString) - .setParent(Context.current().with(((ServerCall<?>) call).getSpan())).startSpan(); + Span span = TraceUtil.getGlobalTracer().spanBuilder("RpcServer.callMethod") + .setParent(Context.current().with(((ServerCall<?>) call).getSpan())).startSpan() + .setAttribute(TraceUtil.RPC_SERVICE_KEY, serviceName) + .setAttribute(TraceUtil.RPC_METHOD_KEY, methodName); try (Scope traceScope = span.makeCurrent()) { if (!this.rpcServer.isStarted()) { InetSocketAddress address = rpcServer.getListenerAddress(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java index db7f052..823005b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java @@ -629,8 +629,7 @@ abstract class ServerRpcConnection implements Closeable { }; Context traceCtx = GlobalOpenTelemetry.getPropagators().getTextMapPropagator() .extract(Context.current(), header.getTraceInfo(), getter); - Span span = - TraceUtil.getGlobalTracer().spanBuilder("RpcServer.process").setParent(traceCtx).startSpan(); + Span span = TraceUtil.createRemoteSpan("RpcServer.process", traceCtx); try (Scope scope = span.makeCurrent()) { int id = header.getCallId(); if (RpcServer.LOG.isTraceEnabled()) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java index 11978ca..4aca764 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; +import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.util.StringUtils; @@ -448,6 +449,19 @@ public abstract class AbstractTestIPC { } } + private SpanData waitSpan(String name) { + Waiter.waitFor(CONF, 1000, + () -> traceRule.getSpans().stream().map(SpanData::getName).anyMatch(s -> s.equals(name))); + return traceRule.getSpans().stream().filter(s -> s.getName().equals(name)).findFirst().get(); + } + + private void assertRpcAttribute(SpanData data, String methodName) { + assertEquals(SERVICE.getDescriptorForType().getName(), + data.getAttributes().get(TraceUtil.RPC_SERVICE_KEY)); + assertEquals(methodName, + data.getAttributes().get(TraceUtil.RPC_METHOD_KEY)); + } + @Test public void testTracing() throws IOException, ServiceException { RpcServer rpcServer = createRpcServer(null, "testRpcServer", @@ -457,9 +471,8 @@ public abstract class AbstractTestIPC { rpcServer.start(); BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); stub.pause(null, PauseRequestProto.newBuilder().setMs(100).build()); - Waiter.waitFor(CONF, 1000, () -> traceRule.getSpans().stream().map(SpanData::getName) - .anyMatch(s -> s.equals("RpcClient.callMethod.TestProtobufRpcProto.pause"))); - + assertRpcAttribute(waitSpan("RpcClient.callMethod"), "pause"); + assertRpcAttribute(waitSpan("RpcServer.callMethod"), "pause"); assertSameTraceId(); for (SpanData data : traceRule.getSpans()) { assertThat( @@ -471,9 +484,8 @@ public abstract class AbstractTestIPC { traceRule.clearSpans(); assertThrows(ServiceException.class, () -> stub.error(null, EmptyRequestProto.getDefaultInstance())); - Waiter.waitFor(CONF, 1000, () -> traceRule.getSpans().stream().map(SpanData::getName) - .anyMatch(s -> s.equals("RpcClient.callMethod.TestProtobufRpcProto.error"))); - + assertRpcAttribute(waitSpan("RpcClient.callMethod"), "error"); + assertRpcAttribute(waitSpan("RpcServer.callMethod"), "error"); assertSameTraceId(); for (SpanData data : traceRule.getSpans()) { assertEquals(StatusCode.ERROR, data.getStatus().getStatusCode()); diff --git a/pom.xml b/pom.xml index 75b3c58..1b80645 100755 --- a/pom.xml +++ b/pom.xml @@ -1483,7 +1483,6 @@ <junit.version>4.13</junit.version> <hamcrest.version>1.3</hamcrest.version> <opentelemetry.version>0.13.1</opentelemetry.version> - <opentelemetry-instrumentation.version>0.13.0</opentelemetry-instrumentation.version> <log4j.version>1.2.17</log4j.version> <mockito-core.version>2.28.2</mockito-core.version> <!--Internally we use a different version of protobuf. See hbase-protocol-shaded--> @@ -2192,7 +2191,7 @@ <dependency> <groupId>io.opentelemetry.javaagent</groupId> <artifactId>opentelemetry-javaagent</artifactId> - <version>${opentelemetry-instrumentation.version}</version> + <version>${opentelemetry.version}</version> <classifier>all</classifier> </dependency> <dependency>