Repository: hbase Updated Branches: refs/heads/branch-1 b62f13454 -> 426bd0977
HBASE-13997 ScannerCallableWithReplicas cause Infinitely blocking (Zephyr Guo and Enis) Conflicts: hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/426bd097 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/426bd097 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/426bd097 Branch: refs/heads/branch-1 Commit: 426bd097775dc9ed18b4f208429eeece0b472e95 Parents: b62f134 Author: Enis Soztutar <e...@apache.org> Authored: Fri Jul 10 16:06:29 2015 -0700 Committer: Enis Soztutar <e...@apache.org> Committed: Mon Jul 13 12:24:54 2015 -0700 ---------------------------------------------------------------------- .../client/ScannerCallableWithReplicas.java | 55 +++++---------- .../hadoop/hbase/client/TestClientScanner.java | 72 ++++++++++++++++++++ 2 files changed, 88 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/426bd097/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index 1087920..740eb91 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -22,9 +22,7 @@ import static org.apache.hadoop.hbase.client.ClientScanner.createClosestRowBefor import java.io.IOException; import java.io.InterruptedIOException; -import java.util.ArrayList; import java.util.HashSet; -import java.util.List; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; @@ -158,15 +156,13 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> { // We want to accomodate some RPCs for redundant replica scans (but are still in progress) ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs = new ResultBoundedCompletionService<Pair<Result[], ScannerCallable>>( - new RpcRetryingCallerFactory(ScannerCallableWithReplicas.this.conf), pool, + RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf), pool, rl.size() * 5); - List<ExecutionException> exceptions = null; - int submitted = 0, completed = 0; AtomicBoolean done = new AtomicBoolean(false); replicaSwitched.set(false); // submit call for the primary replica. - submitted += addCallsForCurrentReplica(cs, rl); + addCallsForCurrentReplica(cs, rl); try { // wait for the timeout to see whether the primary responds back Future<Pair<Result[], ScannerCallable>> f = cs.poll(timeBeforeReplicas, @@ -179,11 +175,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> { return r == null ? null : r.getFirst(); //great we got a response } } catch (ExecutionException e) { - // the primary call failed with RetriesExhaustedException or DoNotRetryIOException - // but the secondaries might still succeed. Continue on the replica RPCs. - exceptions = new ArrayList<ExecutionException>(rl.size()); - exceptions.add(e); - completed++; + RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries); } catch (CancellationException e) { throw new InterruptedIOException(e.getMessage()); } catch (InterruptedException e) { @@ -191,24 +183,16 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> { } // submit call for the all of the secondaries at once // TODO: this may be an overkill for large region replication - submitted += addCallsForOtherReplicas(cs, rl, 0, rl.size() - 1); + addCallsForOtherReplicas(cs, rl, 0, rl.size() - 1); try { - while (completed < submitted) { - try { - Future<Pair<Result[], ScannerCallable>> f = cs.take(); - Pair<Result[], ScannerCallable> r = f.get(); - if (r != null && r.getSecond() != null) { - updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool); - } - return r == null ? null : r.getFirst(); // great we got an answer - } catch (ExecutionException e) { - // if not cancel or interrupt, wait until all RPC's are done - // one of the tasks failed. Save the exception for later. - if (exceptions == null) exceptions = new ArrayList<ExecutionException>(rl.size()); - exceptions.add(e); - completed++; - } + Future<Pair<Result[], ScannerCallable>> f = cs.take(); + Pair<Result[], ScannerCallable> r = f.get(); + if (r != null && r.getSecond() != null) { + updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool); } + return r == null ? null : r.getFirst(); // great we got an answer + } catch (ExecutionException e) { + RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries); } catch (CancellationException e) { throw new InterruptedIOException(e.getMessage()); } catch (InterruptedException e) { @@ -218,11 +202,6 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> { // calls succeeded or failed. In all case, we stop all our tasks. cs.cancelAll(); } - - if (exceptions != null && !exceptions.isEmpty()) { - RpcRetryingCallerWithReadReplicas.throwEnrichedException(exceptions.get(0), - retries); // just rethrow the first exception for now. - } return null; // unreachable } @@ -283,19 +262,18 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> { return currentScannerCallable != null && currentScannerCallable.isHeartbeatMessage(); } - private int addCallsForCurrentReplica( + private void addCallsForCurrentReplica( ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl) { RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable); outstandingCallables.add(currentScannerCallable); cs.submit(retryingOnReplica, scannerTimeout, currentScannerCallable.id); - return 1; } - private int addCallsForOtherReplicas( + private void addCallsForOtherReplicas( ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl, int min, int max) { if (scan.getConsistency() == Consistency.STRONG) { - return 0; // not scheduling on other replicas for strong consistency + return; // not scheduling on other replicas for strong consistency } for (int id = min; id <= max; id++) { if (currentScannerCallable.id == id) { @@ -307,7 +285,6 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> { RetryingRPC retryingOnReplica = new RetryingRPC(s); cs.submit(retryingOnReplica, scannerTimeout, id); } - return max - min + 1; } /** @@ -354,8 +331,8 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> { // and we can't invoke it multiple times at the same time) this.caller = ScannerCallableWithReplicas.this.caller; if (scan.getConsistency() == Consistency.TIMELINE) { - this.caller = new RpcRetryingCallerFactory(ScannerCallableWithReplicas.this.conf). - <Result[]>newCaller(); + this.caller = RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf) + .<Result[]>newCaller(); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/426bd097/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java index a91def3..44a742f 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java @@ -21,8 +21,14 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Mockito.when; import java.io.IOException; +import java.util.Iterator; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -31,6 +37,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.Type; +import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.testclassification.SmallTests; @@ -486,4 +493,69 @@ public class TestClientScanner { assertFalse(cs.advance()); } } + + /** + * Tests the case where all replicas of a region throw an exception. It should not cause a hang + * but the exception should propagate to the client + */ + @Test (timeout = 30000) + public void testExceptionsFromReplicasArePropagated() throws IOException { + scan.setConsistency(Consistency.TIMELINE); + + // Mock a caller which calls the callable for ScannerCallableWithReplicas, + // but throws an exception for the actual scanner calls via callWithRetries. + rpcFactory = new MockRpcRetryingCallerFactory(conf); + conf.set(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, + MockRpcRetryingCallerFactory.class.getName()); + + // mock 3 replica locations + when(clusterConn.locateRegion((TableName)any(), (byte[])any(), anyBoolean(), + anyBoolean(), anyInt())).thenReturn(new RegionLocations(null, null, null)); + + try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf("table"), + clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) { + Iterator<Result> iter = scanner.iterator(); + while (iter.hasNext()) { + iter.next(); + } + fail("Should have failed with RetriesExhaustedException"); + } catch (RetriesExhaustedException expected) { + + } + } + + public static class MockRpcRetryingCallerFactory extends RpcRetryingCallerFactory { + + public MockRpcRetryingCallerFactory(Configuration conf) { + super(conf); + } + + @Override + public <T> RpcRetryingCaller<T> newCaller() { + return new RpcRetryingCaller<T>(0, 0, 0) { + @Override + public void cancel() { + } + @Override + public T callWithRetries(RetryingCallable<T> callable, int callTimeout) + throws IOException, RuntimeException { + throw new IOException("Scanner exception"); + } + + @Override + public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout) + throws IOException, RuntimeException { + try { + return callable.call(callTimeout); + } catch (IOException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + } + + } + }