Repository: hbase Updated Branches: refs/heads/branch-2 97f0aad66 -> 33ce14cb2
HBASE-19643 Need to update cache location when get error in AsyncBatchRpcRetryingCaller Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/33ce14cb Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/33ce14cb Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/33ce14cb Branch: refs/heads/branch-2 Commit: 33ce14cb2997f91882fcca7b51afdeb522c3b8ca Parents: 97f0aad Author: Guanghao Zhang <zg...@apache.org> Authored: Wed Dec 27 17:39:52 2017 +0800 Committer: Guanghao Zhang <zg...@apache.org> Committed: Thu Dec 28 14:44:17 2017 +0800 ---------------------------------------------------------------------- .../client/AsyncBatchRpcRetryingCaller.java | 3 +++ .../hbase/client/TestAsyncTableBatch.java | 26 ++++++++++++++++++++ 2 files changed, 29 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/33ce14cb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java index 7249435..8d59130 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java @@ -262,6 +262,7 @@ class AsyncBatchRpcRetryingCaller<T> { } else if (result instanceof Throwable) { Throwable error = translateException((Throwable) result); logException(tries, () -> Stream.of(regionReq), error, serverName); + conn.getLocator().updateCachedLocation(regionReq.loc, error); if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), getExtraContextForError(serverName)); @@ -364,6 +365,8 @@ class AsyncBatchRpcRetryingCaller<T> { ServerName serverName) { Throwable error = translateException(t); logException(tries, () -> actionsByRegion.values().stream(), error, serverName); + actionsByRegion + .forEach((rn, regionReq) -> conn.getLocator().updateCachedLocation(regionReq.loc, error)); if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { failAll(actionsByRegion.values().stream().flatMap(r -> r.actions.stream()), tries, error, serverName); http://git-wip-us.apache.org/repos/asf/hbase/blob/33ce14cb/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java index 489ad1d..f47e6e9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client; import static org.hamcrest.CoreMatchers.instanceOf; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -179,6 +180,31 @@ public class TestAsyncTableBatch { } @Test + public void testWithRegionServerFailover() throws Exception { + AsyncTable<?> table = tableGetter.apply(TABLE_NAME); + table.putAll(IntStream.range(0, COUNT) + .mapToObj(i -> new Put(getRow(i)).addColumn(FAMILY, CQ, Bytes.toBytes(i))) + .collect(Collectors.toList())).get(); + TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).abort("Aborting for tests"); + Thread.sleep(100); + table.putAll(IntStream.range(COUNT, 2 * COUNT) + .mapToObj(i -> new Put(getRow(i)).addColumn(FAMILY, CQ, Bytes.toBytes(i))) + .collect(Collectors.toList())).get(); + List<Result> results = table.getAll( + IntStream.range(0, 2 * COUNT).mapToObj(i -> new Get(getRow(i))).collect(Collectors.toList())) + .get(); + assertEquals(2 * COUNT, results.size()); + results.forEach(r -> assertFalse(r.isEmpty())); + table.deleteAll(IntStream.range(0, 2 * COUNT).mapToObj(i -> new Delete(getRow(i))) + .collect(Collectors.toList())).get(); + results = table.getAll( + IntStream.range(0, 2 * COUNT).mapToObj(i -> new Get(getRow(i))).collect(Collectors.toList())) + .get(); + assertEquals(2 * COUNT, results.size()); + results.forEach(r -> assertTrue(r.isEmpty())); + } + + @Test public void testMixed() throws InterruptedException, ExecutionException, IOException { AsyncTable<?> table = tableGetter.apply(TABLE_NAME); table.putAll(IntStream.range(0, 7)