This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-2.2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.2 by this push:
new 87b00c9 HBASE-22267 Implement client push back for async client
87b00c9 is described below
commit 87b00c96d2e883a642953c9d79af25649f450558
Author: zhangduo <[email protected]>
AuthorDate: Sun Apr 21 11:58:52 2019 +0800
HBASE-22267 Implement client push back for async client
---
.../hbase/client/AsyncBatchRpcRetryingCaller.java | 119 ++++++++----
.../hadoop/hbase/client/AsyncConnectionImpl.java | 15 ++
.../hbase/client/AsyncRequestFutureImpl.java | 42 +----
.../hadoop/hbase/client/ConnectionUtils.java | 23 +++
.../hadoop/hbase/client/MetricsConnection.java | 8 +-
.../hadoop/hbase/client/RawAsyncTableImpl.java | 12 +-
.../hbase/client/ServerStatisticTracker.java | 10 +-
.../hadoop/hbase/client/TestAsyncProcess.java | 2 +-
...ntPushback.java => ClientPushbackTestBase.java} | 117 +++++-------
.../hbase/client/TestAsyncClientPushback.java | 96 ++++++++++
.../hadoop/hbase/client/TestClientPushback.java | 201 +++++----------------
11 files changed, 332 insertions(+), 313 deletions(-)
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
index f9bcf74..e429422 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
@@ -54,6 +54,8 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.MultiResponse.RegionResult;
import
org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext;
+import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
+import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -134,6 +136,10 @@ class AsyncBatchRpcRetryingCaller<T> {
() -> new RegionRequest(loc)).actions.add(action);
}
+ public void setRegionRequest(byte[] regionName, RegionRequest regionReq) {
+ actionsByRegion.put(regionName, regionReq);
+ }
+
public int getPriority() {
return actionsByRegion.values().stream().flatMap(rr ->
rr.actions.stream())
.mapToInt(Action::getPriority).max().orElse(HConstants.PRIORITY_UNSET);
@@ -298,6 +304,8 @@ class AsyncBatchRpcRetryingCaller<T> {
private void onComplete(Map<byte[], RegionRequest> actionsByRegion, int
tries,
ServerName serverName, MultiResponse resp) {
+ ConnectionUtils.updateStats(conn.getStatisticsTracker(),
conn.getConnectionMetrics(),
+ serverName, resp);
List<Action> failedActions = new ArrayList<>();
MutableBoolean retryImmediately = new MutableBoolean(false);
actionsByRegion.forEach((rn, regionReq) -> {
@@ -333,55 +341,88 @@ class AsyncBatchRpcRetryingCaller<T> {
}
}
- private void send(Map<ServerName, ServerRequest> actionsByServer, int tries)
{
+ private void sendToServer(ServerName serverName, ServerRequest serverReq,
int tries) {
long remainingNs;
if (operationTimeoutNs > 0) {
remainingNs = remainingTimeNs();
if (remainingNs <= 0) {
- failAll(actionsByServer.values().stream().flatMap(m ->
m.actionsByRegion.values().stream())
- .flatMap(r -> r.actions.stream()), tries);
+ failAll(serverReq.actionsByRegion.values().stream().flatMap(r ->
r.actions.stream()),
+ tries);
return;
}
} else {
remainingNs = Long.MAX_VALUE;
}
- actionsByServer.forEach((sn, serverReq) -> {
- ClientService.Interface stub;
- try {
- stub = conn.getRegionServerStub(sn);
- } catch (IOException e) {
- onError(serverReq.actionsByRegion, tries, e, sn);
- return;
- }
- ClientProtos.MultiRequest req;
- List<CellScannable> cells = new ArrayList<>();
- // Map from a created RegionAction to the original index for a
RowMutations within
- // the original list of actions. This will be used to process the
results when there
- // is RowMutations in the action list.
- Map<Integer, Integer> rowMutationsIndexMap = new HashMap<>();
- try {
- req = buildReq(serverReq.actionsByRegion, cells, rowMutationsIndexMap);
- } catch (IOException e) {
- onError(serverReq.actionsByRegion, tries, e, sn);
- return;
- }
- HBaseRpcController controller =
conn.rpcControllerFactory.newController();
- resetController(controller, Math.min(rpcTimeoutNs, remainingNs),
- calcPriority(serverReq.getPriority(), tableName));
- if (!cells.isEmpty()) {
- controller.setCellScanner(createCellScanner(cells));
+ ClientService.Interface stub;
+ try {
+ stub = conn.getRegionServerStub(serverName);
+ } catch (IOException e) {
+ onError(serverReq.actionsByRegion, tries, e, serverName);
+ return;
+ }
+ ClientProtos.MultiRequest req;
+ List<CellScannable> cells = new ArrayList<>();
+ // Map from a created RegionAction to the original index for a
RowMutations within
+ // the original list of actions. This will be used to process the results
when there
+ // is RowMutations in the action list.
+ Map<Integer, Integer> rowMutationsIndexMap = new HashMap<>();
+ try {
+ req = buildReq(serverReq.actionsByRegion, cells, rowMutationsIndexMap);
+ } catch (IOException e) {
+ onError(serverReq.actionsByRegion, tries, e, serverName);
+ return;
+ }
+ HBaseRpcController controller = conn.rpcControllerFactory.newController();
+ resetController(controller, Math.min(rpcTimeoutNs, remainingNs),
+ calcPriority(serverReq.getPriority(), tableName));
+ if (!cells.isEmpty()) {
+ controller.setCellScanner(createCellScanner(cells));
+ }
+ stub.multi(controller, req, resp -> {
+ if (controller.failed()) {
+ onError(serverReq.actionsByRegion, tries, controller.getFailed(),
serverName);
+ } else {
+ try {
+ onComplete(serverReq.actionsByRegion, tries, serverName,
ResponseConverter.getResults(req,
+ rowMutationsIndexMap, resp, controller.cellScanner()));
+ } catch (Exception e) {
+ onError(serverReq.actionsByRegion, tries, e, serverName);
+ return;
+ }
}
- stub.multi(controller, req, resp -> {
- if (controller.failed()) {
- onError(serverReq.actionsByRegion, tries, controller.getFailed(),
sn);
+ });
+ }
+
+ // We will make use of the ServerStatisticTracker to determine whether we
need to delay a bit,
+ // based on the load of the region server and the region.
+ private void sendOrDelay(Map<ServerName, ServerRequest> actionsByServer, int
tries) {
+ Optional<MetricsConnection> metrics = conn.getConnectionMetrics();
+ Optional<ServerStatisticTracker> optStats = conn.getStatisticsTracker();
+ if (!optStats.isPresent()) {
+ actionsByServer.forEach((serverName, serverReq) -> {
+ metrics.ifPresent(MetricsConnection::incrNormalRunners);
+ sendToServer(serverName, serverReq, tries);
+ });
+ return;
+ }
+ ServerStatisticTracker stats = optStats.get();
+ ClientBackoffPolicy backoffPolicy = conn.getBackoffPolicy();
+ actionsByServer.forEach((serverName, serverReq) -> {
+ ServerStatistics serverStats = stats.getStats(serverName);
+ Map<Long, ServerRequest> groupByBackoff = new HashMap<>();
+ serverReq.actionsByRegion.forEach((regionName, regionReq) -> {
+ long backoff = backoffPolicy.getBackoffTime(serverName, regionName,
serverStats);
+ groupByBackoff.computeIfAbsent(backoff, k -> new ServerRequest())
+ .setRegionRequest(regionName, regionReq);
+ });
+ groupByBackoff.forEach((backoff, sr) -> {
+ if (backoff > 0) {
+ metrics.ifPresent(m ->
m.incrDelayRunnersAndUpdateDelayInterval(backoff));
+ retryTimer.newTimeout(timer -> sendToServer(serverName, sr, tries),
backoff,
+ TimeUnit.MILLISECONDS);
} else {
- try {
- onComplete(serverReq.actionsByRegion, tries, sn,
ResponseConverter.getResults(req,
- rowMutationsIndexMap, resp, controller.cellScanner()));
- } catch (Exception e) {
- onError(serverReq.actionsByRegion, tries, e, sn);
- return;
- }
+ metrics.ifPresent(MetricsConnection::incrNormalRunners);
+ sendToServer(serverName, sr, tries);
}
});
});
@@ -454,7 +495,7 @@ class AsyncBatchRpcRetryingCaller<T> {
}))
.toArray(CompletableFuture[]::new)), (v, r) -> {
if (!actionsByServer.isEmpty()) {
- send(actionsByServer, tries);
+ sendOrDelay(actionsByServer, tries);
}
if (!locateFailed.isEmpty()) {
tryResubmit(locateFailed.stream(), tries, false);
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index f046e7a..7d59984 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -38,6 +38,8 @@ import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
+import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
@@ -101,6 +103,9 @@ class AsyncConnectionImpl implements AsyncConnection {
private final AtomicReference<CompletableFuture<MasterService.Interface>>
masterStubMakeFuture =
new AtomicReference<>();
+ private final Optional<ServerStatisticTracker> stats;
+ private final ClientBackoffPolicy backoffPolicy;
+
private ChoreService authService;
private volatile boolean closed = false;
@@ -133,6 +138,8 @@ class AsyncConnectionImpl implements AsyncConnection {
} else {
nonceGenerator = NO_NONCE_GENERATOR;
}
+ this.stats = Optional.ofNullable(ServerStatisticTracker.create(conf));
+ this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
}
private void spawnRenewalChore(final UserGroupInformation user) {
@@ -233,6 +240,14 @@ class AsyncConnectionImpl implements AsyncConnection {
masterStub.compareAndSet(stub, null);
}
+ Optional<ServerStatisticTracker> getStatisticsTracker() {
+ return stats;
+ }
+
+ ClientBackoffPolicy getBackoffPolicy() {
+ return backoffPolicy;
+ }
+
@Override
public AsyncTableBuilder<AdvancedScanResultConsumer>
getTableBuilder(TableName tableName) {
return new AsyncTableBuilderBase<AdvancedScanResultConsumer>(tableName,
connConf) {
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
index 525033d..e46a50e 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
@@ -28,6 +28,7 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
@@ -55,9 +56,6 @@ import org.slf4j.LoggerFactory;
import
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
-
/**
* The context, and return value, for a single submit/submitAll call.
* Note on how this class (one AP submit) works. Initially, all requests are
split into groups
@@ -614,8 +612,8 @@ class AsyncRequestFutureImpl<CResult> implements
AsyncRequestFuture {
traceText = "AsyncProcess.clientBackoff.sendMultiAction";
runnable = runner;
if (asyncProcess.connection.getConnectionMetrics() != null) {
- asyncProcess.connection.getConnectionMetrics().incrDelayRunners();
-
asyncProcess.connection.getConnectionMetrics().updateDelayInterval(runner.getSleepTime());
+ asyncProcess.connection.getConnectionMetrics()
+ .incrDelayRunnersAndUpdateDelayInterval(runner.getSleepTime());
}
} else {
if (asyncProcess.connection.getConnectionMetrics() != null) {
@@ -802,19 +800,16 @@ class AsyncRequestFutureImpl<CResult> implements
AsyncRequestFuture {
* @param responses - the response, if any
* @param numAttempt - the attempt
*/
- private void receiveMultiAction(MultiAction multiAction,
- ServerName server, MultiResponse responses,
int numAttempt) {
+ private void receiveMultiAction(MultiAction multiAction, ServerName server,
+ MultiResponse responses, int numAttempt) {
assert responses != null;
-
- Map<byte[], MultiResponse.RegionResult> results = responses.getResults();
- updateStats(server, results);
-
+ updateStats(server, responses);
// Success or partial success
// Analyze detailed results. We can still have individual failures to be
redo.
// two specific throwables are managed:
// - DoNotRetryIOException: we continue to retry for other actions
// - RegionMovedException: we update the cache with the new region
location
-
+ Map<byte[], MultiResponse.RegionResult> results = responses.getResults();
List<Action> toReplay = new ArrayList<>();
Throwable lastException = null;
int failureCount = 0;
@@ -926,26 +921,9 @@ class AsyncRequestFutureImpl<CResult> implements
AsyncRequestFuture {
}
@VisibleForTesting
- protected void updateStats(ServerName server, Map<byte[],
MultiResponse.RegionResult> results) {
- boolean metrics = asyncProcess.connection.getConnectionMetrics() != null;
- boolean stats = asyncProcess.connection.getStatisticsTracker() != null;
- if (!stats && !metrics) {
- return;
- }
- for (Map.Entry<byte[], MultiResponse.RegionResult> regionStats :
results.entrySet()) {
- byte[] regionName = regionStats.getKey();
- ClientProtos.RegionLoadStats stat = regionStats.getValue().getStat();
- if (stat == null) {
- LOG.error("No ClientProtos.RegionLoadStats found for server=" + server
- + ", region=" + Bytes.toStringBinary(regionName));
- continue;
- }
- RegionLoadStats regionLoadstats =
ProtobufUtil.createRegionLoadStats(stat);
-
ResultStatsUtil.updateStats(asyncProcess.connection.getStatisticsTracker(),
server,
- regionName, regionLoadstats);
-
ResultStatsUtil.updateStats(asyncProcess.connection.getConnectionMetrics(),
- server, regionName, regionLoadstats);
- }
+ protected void updateStats(ServerName server, MultiResponse resp) {
+
ConnectionUtils.updateStats(Optional.ofNullable(asyncProcess.connection.getStatisticsTracker()),
+ Optional.ofNullable(asyncProcess.connection.getConnectionMetrics()),
server, resp);
}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index 6b06a7f..4a2fa3a 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -62,8 +62,10 @@ import
org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hbase.thirdparty.io.netty.util.Timer;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
@@ -672,4 +674,25 @@ public final class ConnectionUtils {
}
}
}
+
+ static void updateStats(Optional<ServerStatisticTracker> optStats,
+ Optional<MetricsConnection> optMetrics, ServerName serverName,
MultiResponse resp) {
+ if (!optStats.isPresent() && !optMetrics.isPresent()) {
+ // ServerStatisticTracker and MetricsConnection are both not present,
just return
+ return;
+ }
+ resp.getResults().forEach((regionName, regionResult) -> {
+ ClientProtos.RegionLoadStats stat = regionResult.getStat();
+ if (stat == null) {
+ LOG.error("No ClientProtos.RegionLoadStats found for server={},
region={}", serverName,
+ Bytes.toStringBinary(regionName));
+ return;
+ }
+ RegionLoadStats regionLoadStats =
ProtobufUtil.createRegionLoadStats(stat);
+ optStats.ifPresent(
+ stats -> ResultStatsUtil.updateStats(stats, serverName, regionName,
regionLoadStats));
+ optMetrics.ifPresent(
+ metrics -> ResultStatsUtil.updateStats(metrics, serverName,
regionName, regionLoadStats));
+ });
+ }
}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
index c62a712..d842f90 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
@@ -421,13 +421,9 @@ public class MetricsConnection implements
StatisticTrackable {
this.runnerStats.incrNormalRunners();
}
- /** Increment the number of delay runner counts. */
- public void incrDelayRunners() {
+ /** Increment the number of delay runner counts and update delay interval of
delay runner. */
+ public void incrDelayRunnersAndUpdateDelayInterval(long interval) {
this.runnerStats.incrDelayRunners();
- }
-
- /** Update delay interval of delay runner. */
- public void updateDelayInterval(long interval) {
this.runnerStats.updateDelayInterval(interval);
}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index 688c86f..6b87653 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -357,8 +357,8 @@ class RawAsyncTableImpl implements
AsyncTable<AdvancedScanResultConsumer> {
preCheck();
return RawAsyncTableImpl.this
.<Boolean> newCaller(row, mutation.getMaxPriority(), rpcTimeoutNs)
- .action((controller, loc, stub) -> RawAsyncTableImpl.<Boolean>
mutateRow(controller, loc,
- stub, mutation,
+ .action((controller, loc, stub) -> RawAsyncTableImpl.this.<Boolean>
mutateRow(controller,
+ loc, stub, mutation,
(rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family,
qualifier,
new BinaryComparator(value), CompareType.valueOf(op.name()),
timeRange, rm),
resp -> resp.getExists()))
@@ -373,7 +373,7 @@ class RawAsyncTableImpl implements
AsyncTable<AdvancedScanResultConsumer> {
// We need the MultiRequest when constructing the
org.apache.hadoop.hbase.client.MultiResponse,
// so here I write a new method as I do not want to change the abstraction
of call method.
- private static <RESP> CompletableFuture<RESP> mutateRow(HBaseRpcController
controller,
+ private <RESP> CompletableFuture<RESP> mutateRow(HBaseRpcController
controller,
HRegionLocation loc, ClientService.Interface stub, RowMutations mutation,
Converter<MultiRequest, byte[], RowMutations> reqConvert,
Function<Result, RESP> respConverter) {
@@ -391,6 +391,8 @@ class RawAsyncTableImpl implements
AsyncTable<AdvancedScanResultConsumer> {
try {
org.apache.hadoop.hbase.client.MultiResponse multiResp =
ResponseConverter.getResults(req, resp,
controller.cellScanner());
+ ConnectionUtils.updateStats(conn.getStatisticsTracker(),
conn.getConnectionMetrics(),
+ loc.getServerName(), multiResp);
Throwable ex = multiResp.getException(regionName);
if (ex != null) {
future.completeExceptionally(ex instanceof IOException ? ex
@@ -415,8 +417,8 @@ class RawAsyncTableImpl implements
AsyncTable<AdvancedScanResultConsumer> {
@Override
public CompletableFuture<Void> mutateRow(RowMutations mutation) {
return this.<Void> newCaller(mutation.getRow(), mutation.getMaxPriority(),
writeRpcTimeoutNs)
- .action((controller, loc, stub) -> RawAsyncTableImpl.<Void>
mutateRow(controller, loc, stub,
- mutation, (rn, rm) -> {
+ .action((controller, loc, stub) -> this.<Void> mutateRow(controller,
loc, stub, mutation,
+ (rn, rm) -> {
RegionAction.Builder regionMutationBuilder =
RequestConverter.buildRegionAction(rn, rm);
regionMutationBuilder.setAtomic(true);
return
MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java
index c5c7375..12e3e3b 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java
@@ -19,15 +19,12 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
-import
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-
import java.util.concurrent.ConcurrentHashMap;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
-import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
+import org.apache.yetus.audience.InterfaceAudience;
/**
* Tracks the statistics for multiple regions
@@ -53,9 +50,4 @@ public class ServerStatisticTracker implements
StatisticTrackable {
}
return new ServerStatisticTracker();
}
-
- @VisibleForTesting
- ServerStatistics getServerStatsForTesting(ServerName server) {
- return stats.get(server);
- }
}
diff --git
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index 71b21ac..81dcc46 100644
---
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -271,7 +271,7 @@ public class TestAsyncProcess {
}
@Override
- protected void updateStats(ServerName server, Map<byte[],
MultiResponse.RegionResult> results) {
+ protected void updateStats(ServerName server, MultiResponse resp) {
// Do nothing for avoiding the NPE if we test the ClientBackofPolicy.
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/ClientPushbackTestBase.java
similarity index 63%
copy from
hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
copy to
hbase-server/src/test/java/org/apache/hadoop/hbase/client/ClientPushbackTestBase.java
index d6f32f5..a7202b8 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/ClientPushbackTestBase.java
@@ -24,51 +24,42 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.*;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
import org.apache.hadoop.hbase.client.backoff.ExponentialClientBackoffPolicy;
import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
-import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.junit.AfterClass;
import org.junit.BeforeClass;
-import org.junit.ClassRule;
import org.junit.Test;
-import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test that we can actually send and use region metrics to slowdown client
writes
*/
-@Category(MediumTests.class)
-public class TestClientPushback {
+public abstract class ClientPushbackTestBase {
- @ClassRule
- public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestClientPushback.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(ClientPushbackTestBase.class);
+ protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
- private static final Logger LOG =
LoggerFactory.getLogger(TestClientPushback.class);
- private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
-
- private static final TableName tableName =
TableName.valueOf("client-pushback");
+ protected static final TableName tableName =
TableName.valueOf("client-pushback");
private static final byte[] family = Bytes.toBytes("f");
private static final byte[] qualifier = Bytes.toBytes("q");
private static final long flushSizeBytes = 512;
@BeforeClass
- public static void setupCluster() throws Exception{
+ public static void setupCluster() throws Exception {
Configuration conf = UTIL.getConfiguration();
// enable backpressure
conf.setBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, true);
@@ -79,52 +70,59 @@ public class TestClientPushback {
// load
conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, flushSizeBytes);
// ensure we block the flushes when we are double that flushsize
- conf.setLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER,
HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER);
+ conf.setLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER,
+ HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER);
conf.setBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, true);
UTIL.startMiniCluster(1);
UTIL.createTable(tableName, family);
}
@AfterClass
- public static void teardownCluster() throws Exception{
+ public static void cleanupCluster() throws Exception {
UTIL.shutdownMiniCluster();
}
- @Test
- public void testClientTracksServerPushback() throws Exception{
- Configuration conf = UTIL.getConfiguration();
+ protected abstract ClientBackoffPolicy getBackoffPolicy() throws IOException;
+
+ protected abstract ServerStatisticTracker getStatisticsTracker() throws
IOException;
+
+ protected abstract MetricsConnection getConnectionMetrics() throws
IOException;
- ClusterConnection conn = (ClusterConnection)
ConnectionFactory.createConnection(conf);
- BufferedMutatorImpl mutator = (BufferedMutatorImpl)
conn.getBufferedMutator(tableName);
+ protected abstract void mutate(Put put) throws IOException;
+ protected abstract void mutate(Put put, AtomicLong endTime, CountDownLatch
latch)
+ throws IOException;
+
+ protected abstract void mutateRow(RowMutations mutations) throws IOException;
+
+ @Test
+ public void testClientTracksServerPushback() throws Exception {
HRegionServer rs = UTIL.getHBaseCluster().getRegionServer(0);
Region region = rs.getRegions(tableName).get(0);
- LOG.debug("Writing some data to "+tableName);
+ LOG.debug("Writing some data to " + tableName);
// write some data
Put p = new Put(Bytes.toBytes("row"));
p.addColumn(family, qualifier, Bytes.toBytes("value1"));
- mutator.mutate(p);
- mutator.flush();
+ mutate(p);
// get the current load on RS. Hopefully memstore isn't flushed since we
wrote the the data
- int load = (int) ((region.getMemStoreHeapSize() * 100)
- / flushSizeBytes);
- LOG.debug("Done writing some data to "+tableName);
+ int load = (int) ((region.getMemStoreHeapSize() * 100) / flushSizeBytes);
+ LOG.debug("Done writing some data to " + tableName);
// get the stats for the region hosting our table
- ClientBackoffPolicy backoffPolicy = conn.getBackoffPolicy();
+ ClientBackoffPolicy backoffPolicy = getBackoffPolicy();
assertTrue("Backoff policy is not correctly configured",
backoffPolicy instanceof ExponentialClientBackoffPolicy);
- ServerStatisticTracker stats = conn.getStatisticsTracker();
- assertNotNull( "No stats configured for the client!", stats);
+ ServerStatisticTracker stats = getStatisticsTracker();
+ assertNotNull("No stats configured for the client!", stats);
// get the names so we can query the stats
ServerName server = rs.getServerName();
byte[] regionName = region.getRegionInfo().getRegionName();
// check to see we found some load on the memstore
- ServerStatistics serverStats = stats.getServerStatsForTesting(server);
+ ServerStatistics serverStats = stats.getStats(server);
ServerStatistics.RegionStatistics regionStats =
serverStats.getStatsForRegion(regionName);
assertEquals("We did not find some load on the memstore", load,
regionStats.getMemStoreLoadPercent());
@@ -134,45 +132,29 @@ public class TestClientPushback {
LOG.debug("Backoff calculated for " +
region.getRegionInfo().getRegionNameAsString() + " @ " +
server + " is " + backoffTime);
- // Reach into the connection and submit work directly to AsyncProcess so
we can
- // monitor how long the submission was delayed via a callback
- List<Row> ops = new ArrayList<>(1);
- ops.add(p);
- final CountDownLatch latch = new CountDownLatch(1);
- final AtomicLong endTime = new AtomicLong();
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicLong endTime = new AtomicLong();
long startTime = EnvironmentEdgeManager.currentTime();
- Batch.Callback<Result> callback = (byte[] r, byte[] row, Result result) ->
{
- endTime.set(EnvironmentEdgeManager.currentTime());
- latch.countDown();
- };
- AsyncProcessTask<Result> task = AsyncProcessTask.newBuilder(callback)
- .setPool(mutator.getPool())
- .setTableName(tableName)
- .setRowAccess(ops)
- .setSubmittedRows(AsyncProcessTask.SubmittedRows.AT_LEAST_ONE)
-
.setOperationTimeout(conn.getConnectionConfiguration().getOperationTimeout())
- .setRpcTimeout(60 * 1000)
- .build();
- mutator.getAsyncProcess().submit(task);
+ mutate(p, endTime, latch);
// Currently the ExponentialClientBackoffPolicy under these test conditions
// produces a backoffTime of 151 milliseconds. This is long enough so the
// wait and related checks below are reasonable. Revisit if the backoff
// time reported by above debug logging has significantly deviated.
+ MetricsConnection metrics = getConnectionMetrics();
String name = server.getServerName() + "," +
Bytes.toStringBinary(regionName);
- MetricsConnection.RegionStats rsStats = conn.getConnectionMetrics().
- serverStats.get(server).get(regionName);
+ MetricsConnection.RegionStats rsStats =
metrics.serverStats.get(server).get(regionName);
assertEquals(name, rsStats.name);
assertEquals(rsStats.heapOccupancyHist.getSnapshot().getMean(),
- (double)regionStats.getHeapOccupancyPercent(), 0.1 );
+ (double) regionStats.getHeapOccupancyPercent(), 0.1);
assertEquals(rsStats.memstoreLoadHist.getSnapshot().getMean(),
- (double)regionStats.getMemStoreLoadPercent(), 0.1);
+ (double) regionStats.getMemStoreLoadPercent(), 0.1);
- MetricsConnection.RunnerStats runnerStats =
conn.getConnectionMetrics().runnerStats;
+ MetricsConnection.RunnerStats runnerStats = metrics.runnerStats;
assertEquals(1, runnerStats.delayRunners.getCount());
assertEquals(1, runnerStats.normalRunners.getCount());
- assertEquals("", runnerStats.delayIntevalHist.getSnapshot().getMean(),
- (double)backoffTime, 0.1);
+ assertEquals("", runnerStats.delayIntevalHist.getSnapshot().getMean(),
(double) backoffTime,
+ 0.1);
latch.await(backoffTime * 2, TimeUnit.MILLISECONDS);
assertNotEquals("AsyncProcess did not submit the work time", 0,
endTime.get());
@@ -181,9 +163,6 @@ public class TestClientPushback {
@Test
public void testMutateRowStats() throws IOException {
- Configuration conf = UTIL.getConfiguration();
- ClusterConnection conn = (ClusterConnection)
ConnectionFactory.createConnection(conf);
- Table table = conn.getTable(tableName);
HRegionServer rs = UTIL.getHBaseCluster().getRegionServer(0);
Region region = rs.getRegions(tableName).get(0);
@@ -191,19 +170,19 @@ public class TestClientPushback {
Put p = new Put(Bytes.toBytes("row"));
p.addColumn(family, qualifier, Bytes.toBytes("value2"));
mutations.add(p);
- table.mutateRow(mutations);
+ mutateRow(mutations);
- ServerStatisticTracker stats = conn.getStatisticsTracker();
- assertNotNull( "No stats configured for the client!", stats);
+ ServerStatisticTracker stats = getStatisticsTracker();
+ assertNotNull("No stats configured for the client!", stats);
// get the names so we can query the stats
ServerName server = rs.getServerName();
byte[] regionName = region.getRegionInfo().getRegionName();
// check to see we found some load on the memstore
- ServerStatistics serverStats = stats.getServerStatsForTesting(server);
+ ServerStatistics serverStats = stats.getStats(server);
ServerStatistics.RegionStatistics regionStats =
serverStats.getStatsForRegion(regionName);
assertNotNull(regionStats);
assertTrue(regionStats.getMemStoreLoadPercent() > 0);
- }
+ }
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPushback.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPushback.java
new file mode 100644
index 0000000..cc030d8
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPushback.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestAsyncClientPushback extends ClientPushbackTestBase {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestAsyncClientPushback.class);
+
+ private AsyncConnectionImpl conn;
+
+ private AsyncBufferedMutator mutator;
+
+ @Before
+ public void setUp() throws Exception {
+ conn =
+ (AsyncConnectionImpl)
ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get();
+ mutator = conn.getBufferedMutator(tableName);
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ Closeables.close(mutator, true);
+ Closeables.close(conn, true);
+ }
+
+ @Override
+ protected ClientBackoffPolicy getBackoffPolicy() throws IOException {
+ return conn.getBackoffPolicy();
+ }
+
+ @Override
+ protected ServerStatisticTracker getStatisticsTracker() throws IOException {
+ return conn.getStatisticsTracker().get();
+ }
+
+ @Override
+ protected MetricsConnection getConnectionMetrics() throws IOException {
+ return conn.getConnectionMetrics().get();
+ }
+
+ @Override
+ protected void mutate(Put put) throws IOException {
+ CompletableFuture<?> future = mutator.mutate(put);
+ mutator.flush();
+ future.join();
+ }
+
+ @Override
+ protected void mutate(Put put, AtomicLong endTime, CountDownLatch latch)
throws IOException {
+ FutureUtils.addListener(mutator.mutate(put), (r, e) -> {
+ endTime.set(EnvironmentEdgeManager.currentTime());
+ latch.countDown();
+ });
+ mutator.flush();
+ }
+
+ @Override
+ protected void mutateRow(RowMutations mutations) throws IOException {
+ conn.getTable(tableName).mutateRow(mutations).join();
+ }
+}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
index d6f32f5..e789349 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
@@ -17,193 +17,90 @@
*/
package org.apache.hadoop.hbase.client;
-import static
org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
-import org.apache.hadoop.hbase.client.backoff.ExponentialClientBackoffPolicy;
-import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.After;
+import org.junit.Before;
import org.junit.ClassRule;
-import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-/**
- * Test that we can actually send and use region metrics to slowdown client
writes
- */
-@Category(MediumTests.class)
-public class TestClientPushback {
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestClientPushback extends ClientPushbackTestBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestClientPushback.class);
+ HBaseClassTestRule.forClass(TestClientPushback.class);
- private static final Logger LOG =
LoggerFactory.getLogger(TestClientPushback.class);
- private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+ private ConnectionImplementation conn;
- private static final TableName tableName =
TableName.valueOf("client-pushback");
- private static final byte[] family = Bytes.toBytes("f");
- private static final byte[] qualifier = Bytes.toBytes("q");
- private static final long flushSizeBytes = 512;
+ private BufferedMutatorImpl mutator;
- @BeforeClass
- public static void setupCluster() throws Exception{
- Configuration conf = UTIL.getConfiguration();
- // enable backpressure
- conf.setBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, true);
- // use the exponential backoff policy
- conf.setClass(ClientBackoffPolicy.BACKOFF_POLICY_CLASS,
ExponentialClientBackoffPolicy.class,
- ClientBackoffPolicy.class);
- // turn the memstore size way down so we don't need to write a lot to see
changes in memstore
- // load
- conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, flushSizeBytes);
- // ensure we block the flushes when we are double that flushsize
- conf.setLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER,
HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER);
- conf.setBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, true);
- UTIL.startMiniCluster(1);
- UTIL.createTable(tableName, family);
+ @Before
+ public void setUp() throws IOException {
+ conn = (ConnectionImplementation)
ConnectionFactory.createConnection(UTIL.getConfiguration());
+ mutator = (BufferedMutatorImpl) conn.getBufferedMutator(tableName);
}
- @AfterClass
- public static void teardownCluster() throws Exception{
- UTIL.shutdownMiniCluster();
+ @After
+ public void tearDown() throws IOException {
+ Closeables.close(mutator, true);
+ Closeables.close(conn, true);
}
- @Test
- public void testClientTracksServerPushback() throws Exception{
- Configuration conf = UTIL.getConfiguration();
+ @Override
+ protected ClientBackoffPolicy getBackoffPolicy() throws IOException {
+ return conn.getBackoffPolicy();
+ }
- ClusterConnection conn = (ClusterConnection)
ConnectionFactory.createConnection(conf);
- BufferedMutatorImpl mutator = (BufferedMutatorImpl)
conn.getBufferedMutator(tableName);
+ @Override
+ protected ServerStatisticTracker getStatisticsTracker() throws IOException {
+ return conn.getStatisticsTracker();
+ }
- HRegionServer rs = UTIL.getHBaseCluster().getRegionServer(0);
- Region region = rs.getRegions(tableName).get(0);
+ @Override
+ protected MetricsConnection getConnectionMetrics() throws IOException {
+ return conn.getConnectionMetrics();
+ }
- LOG.debug("Writing some data to "+tableName);
- // write some data
- Put p = new Put(Bytes.toBytes("row"));
- p.addColumn(family, qualifier, Bytes.toBytes("value1"));
- mutator.mutate(p);
+ @Override
+ protected void mutate(Put put) throws IOException {
+ mutator.mutate(put);
mutator.flush();
+ }
- // get the current load on RS. Hopefully memstore isn't flushed since we
wrote the the data
- int load = (int) ((region.getMemStoreHeapSize() * 100)
- / flushSizeBytes);
- LOG.debug("Done writing some data to "+tableName);
-
- // get the stats for the region hosting our table
- ClientBackoffPolicy backoffPolicy = conn.getBackoffPolicy();
- assertTrue("Backoff policy is not correctly configured",
- backoffPolicy instanceof ExponentialClientBackoffPolicy);
-
- ServerStatisticTracker stats = conn.getStatisticsTracker();
- assertNotNull( "No stats configured for the client!", stats);
- // get the names so we can query the stats
- ServerName server = rs.getServerName();
- byte[] regionName = region.getRegionInfo().getRegionName();
-
- // check to see we found some load on the memstore
- ServerStatistics serverStats = stats.getServerStatsForTesting(server);
- ServerStatistics.RegionStatistics regionStats =
serverStats.getStatsForRegion(regionName);
- assertEquals("We did not find some load on the memstore", load,
- regionStats.getMemStoreLoadPercent());
- // check that the load reported produces a nonzero delay
- long backoffTime = backoffPolicy.getBackoffTime(server, regionName,
serverStats);
- assertNotEquals("Reported load does not produce a backoff", 0,
backoffTime);
- LOG.debug("Backoff calculated for " +
region.getRegionInfo().getRegionNameAsString() + " @ " +
- server + " is " + backoffTime);
-
+ @Override
+ protected void mutate(Put put, AtomicLong endTime, CountDownLatch latch)
throws IOException {
// Reach into the connection and submit work directly to AsyncProcess so
we can
// monitor how long the submission was delayed via a callback
List<Row> ops = new ArrayList<>(1);
- ops.add(p);
- final CountDownLatch latch = new CountDownLatch(1);
- final AtomicLong endTime = new AtomicLong();
- long startTime = EnvironmentEdgeManager.currentTime();
+ ops.add(put);
Batch.Callback<Result> callback = (byte[] r, byte[] row, Result result) ->
{
- endTime.set(EnvironmentEdgeManager.currentTime());
- latch.countDown();
+ endTime.set(EnvironmentEdgeManager.currentTime());
+ latch.countDown();
};
- AsyncProcessTask<Result> task = AsyncProcessTask.newBuilder(callback)
- .setPool(mutator.getPool())
- .setTableName(tableName)
- .setRowAccess(ops)
- .setSubmittedRows(AsyncProcessTask.SubmittedRows.AT_LEAST_ONE)
-
.setOperationTimeout(conn.getConnectionConfiguration().getOperationTimeout())
- .setRpcTimeout(60 * 1000)
- .build();
+ AsyncProcessTask<Result> task =
+
AsyncProcessTask.newBuilder(callback).setPool(mutator.getPool()).setTableName(tableName)
+
.setRowAccess(ops).setSubmittedRows(AsyncProcessTask.SubmittedRows.AT_LEAST_ONE)
+
.setOperationTimeout(conn.getConnectionConfiguration().getOperationTimeout())
+ .setRpcTimeout(60 * 1000).build();
mutator.getAsyncProcess().submit(task);
- // Currently the ExponentialClientBackoffPolicy under these test conditions
- // produces a backoffTime of 151 milliseconds. This is long enough so the
- // wait and related checks below are reasonable. Revisit if the backoff
- // time reported by above debug logging has significantly deviated.
- String name = server.getServerName() + "," +
Bytes.toStringBinary(regionName);
- MetricsConnection.RegionStats rsStats = conn.getConnectionMetrics().
- serverStats.get(server).get(regionName);
- assertEquals(name, rsStats.name);
- assertEquals(rsStats.heapOccupancyHist.getSnapshot().getMean(),
- (double)regionStats.getHeapOccupancyPercent(), 0.1 );
- assertEquals(rsStats.memstoreLoadHist.getSnapshot().getMean(),
- (double)regionStats.getMemStoreLoadPercent(), 0.1);
-
- MetricsConnection.RunnerStats runnerStats =
conn.getConnectionMetrics().runnerStats;
-
- assertEquals(1, runnerStats.delayRunners.getCount());
- assertEquals(1, runnerStats.normalRunners.getCount());
- assertEquals("", runnerStats.delayIntevalHist.getSnapshot().getMean(),
- (double)backoffTime, 0.1);
-
- latch.await(backoffTime * 2, TimeUnit.MILLISECONDS);
- assertNotEquals("AsyncProcess did not submit the work time", 0,
endTime.get());
- assertTrue("AsyncProcess did not delay long enough", endTime.get() -
startTime >= backoffTime);
}
- @Test
- public void testMutateRowStats() throws IOException {
- Configuration conf = UTIL.getConfiguration();
- ClusterConnection conn = (ClusterConnection)
ConnectionFactory.createConnection(conf);
- Table table = conn.getTable(tableName);
- HRegionServer rs = UTIL.getHBaseCluster().getRegionServer(0);
- Region region = rs.getRegions(tableName).get(0);
-
- RowMutations mutations = new RowMutations(Bytes.toBytes("row"));
- Put p = new Put(Bytes.toBytes("row"));
- p.addColumn(family, qualifier, Bytes.toBytes("value2"));
- mutations.add(p);
- table.mutateRow(mutations);
-
- ServerStatisticTracker stats = conn.getStatisticsTracker();
- assertNotNull( "No stats configured for the client!", stats);
- // get the names so we can query the stats
- ServerName server = rs.getServerName();
- byte[] regionName = region.getRegionInfo().getRegionName();
-
- // check to see we found some load on the memstore
- ServerStatistics serverStats = stats.getServerStatsForTesting(server);
- ServerStatistics.RegionStatistics regionStats =
serverStats.getStatsForRegion(regionName);
-
- assertNotNull(regionStats);
- assertTrue(regionStats.getMemStoreLoadPercent() > 0);
+ @Override
+ protected void mutateRow(RowMutations mutations) throws IOException {
+ try (Table table = conn.getTable(tableName)) {
+ table.mutateRow(mutations);
}
+ }
}