Repository: hbase Updated Branches: refs/heads/branch-1 aeecd4df8 -> 1d365f580
HBASE-16515 AsyncProcess has incorrent count of tasks if the backoff policy is enabled (ChiaPing Tsai) Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/1d365f58 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/1d365f58 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/1d365f58 Branch: refs/heads/branch-1 Commit: 1d365f580c68675aa8e72f00a61e50c77e5c0b37 Parents: aeecd4d Author: tedyu <[email protected]> Authored: Tue Aug 30 19:40:50 2016 -0700 Committer: tedyu <[email protected]> Committed: Tue Aug 30 19:40:50 2016 -0700 ---------------------------------------------------------------------- .../hadoop/hbase/client/AsyncProcess.java | 6 +- .../hadoop/hbase/client/TestAsyncProcess.java | 65 +++++++++++++++++++- 2 files changed, 67 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/1d365f58/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index aa3ffc1..647a466 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -1066,7 +1066,6 @@ class AsyncProcess { for (Map.Entry<ServerName, MultiAction<Row>> e : actionsByServer.entrySet()) { ServerName server = e.getKey(); MultiAction<Row> multiAction = e.getValue(); - incTaskCounters(multiAction.getRegions(), server); Collection<? extends Runnable> runnables = getNewMultiActionRunnable(server, multiAction, numAttempt); // make sure we correctly count the number of runnables before we try to reuse the send @@ -1114,6 +1113,7 @@ class AsyncProcess { if (connection.getConnectionMetrics() != null) { connection.getConnectionMetrics().incrNormalRunners(); } + incTaskCounters(multiAction.getRegions(), server); SingleServerRequestRunnable runnable = addSingleServerRequestHeapSize(server, new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress)); return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction", runnable)); @@ -1136,6 +1136,7 @@ class AsyncProcess { List<Runnable> toReturn = new ArrayList<Runnable>(actions.size()); for (DelayingRunner runner : actions.values()) { + incTaskCounters(runner.getActions().getRegions(), server); String traceText = "AsyncProcess.sendMultiAction"; Runnable runnable = addSingleServerRequestHeapSize(server, new SingleServerRequestRunnable(runner.getActions(), numAttempt, server, callsInProgress)); @@ -1760,7 +1761,8 @@ class AsyncProcess { } } - private void updateStats(ServerName server, Map<byte[], MultiResponse.RegionResult> results) { + @VisibleForTesting + protected void updateStats(ServerName server, Map<byte[], MultiResponse.RegionResult> results) { boolean metrics = AsyncProcess.this.connection.getConnectionMetrics() != null; boolean stats = AsyncProcess.this.connection.getStatisticsTracker() != null; if (!stats && !metrics) { http://git-wip-us.apache.org/repos/asf/hbase/blob/1d365f58/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index 34ff84d..bf50ee2 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -69,6 +69,8 @@ import org.apache.hadoop.hbase.client.AsyncProcess.RowCheckerHost; import org.apache.hadoop.hbase.client.AsyncProcess.RequestSizeChecker; import org.apache.hadoop.hbase.client.AsyncProcess.RowChecker; import org.apache.hadoop.hbase.client.AsyncProcess.SubmittedSizeChecker; +import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; +import org.apache.hadoop.hbase.client.backoff.ServerStatistics; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; @@ -207,6 +209,11 @@ public class TestAsyncProcess { return super.submit(DUMMY_TABLE, rows, atLeastOne, callback, true); } + + @Override + protected void updateStats(ServerName server, Map<byte[], MultiResponse.RegionResult> results) { + // Do nothing for avoiding the NPE if we test the ClientBackofPolicy. + } @Override protected RpcRetryingCaller<MultiResponse> createCaller( PayloadCarryingServerCallable callable) { @@ -277,7 +284,21 @@ public class TestAsyncProcess { return new CallerWithFailure(ioe); } } - + /** + * Make the backoff time always different on each call. + */ + static class MyClientBackoffPolicy implements ClientBackoffPolicy { + private final Map<ServerName, AtomicInteger> count = new HashMap<>(); + @Override + public long getBackoffTime(ServerName serverName, byte[] region, ServerStatistics stats) { + AtomicInteger inc = count.get(serverName); + if (inc == null) { + inc = new AtomicInteger(0); + count.put(serverName, inc); + } + return inc.getAndIncrement(); + } + } class MyAsyncProcessWithReplicas extends MyAsyncProcess { private Set<byte[]> failures = new TreeSet<byte[]>(new Bytes.ByteArrayComparator()); private long primarySleepMs = 0, replicaSleepMs = 0; @@ -826,6 +847,46 @@ public class TestAsyncProcess { } @Test + public void testTaskCountWithoutClientBackoffPolicy() throws IOException, InterruptedException { + ClusterConnection hc = createHConnection(); + MyAsyncProcess ap = new MyAsyncProcess(hc, conf, false); + testTaskCount(ap); + } + + @Test + public void testTaskCountWithClientBackoffPolicy() throws IOException, InterruptedException { + Configuration copyConf = new Configuration(conf); + copyConf.setBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, true); + MyClientBackoffPolicy bp = new MyClientBackoffPolicy(); + ClusterConnection hc = createHConnection(); + Mockito.when(hc.getConfiguration()).thenReturn(copyConf); + Mockito.when(hc.getStatisticsTracker()).thenReturn(ServerStatisticTracker.create(copyConf)); + Mockito.when(hc.getBackoffPolicy()).thenReturn(bp); + MyAsyncProcess ap = new MyAsyncProcess(hc, copyConf, false); + testTaskCount(ap); + } + + private void testTaskCount(AsyncProcess ap) throws InterruptedIOException, InterruptedException { + List<Put> puts = new ArrayList<>(); + for (int i = 0; i != 3; ++i) { + puts.add(createPut(1, true)); + puts.add(createPut(2, true)); + puts.add(createPut(3, true)); + } + ap.submit(DUMMY_TABLE, puts, true, null, false); + ap.waitUntilDone(); + // More time to wait if there are incorrect task count. + TimeUnit.SECONDS.sleep(1); + assertEquals(0, ap.tasksInProgress.get()); + for (AtomicInteger count : ap.taskCounterPerRegion.values()) { + assertEquals(0, count.get()); + } + for (AtomicInteger count : ap.taskCounterPerServer.values()) { + assertEquals(0, count.get()); + } + } + + @Test public void testMaxTask() throws Exception { final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false); @@ -1587,4 +1648,4 @@ public class TestAsyncProcess { } t.join(); } -} \ No newline at end of file +}
