Repository: hbase Updated Branches: refs/heads/branch-1 b49390922 -> 3b28f66bf
HBASE-18004 getRegionLocations needs to be called once in ScannerCallableWithReplicas#call() (Huaxiang Sun) Signed-off-by: Michael Stack <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/262e6bb3 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/262e6bb3 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/262e6bb3 Branch: refs/heads/branch-1 Commit: 262e6bb3fa20fd138cf13a35b44576d6733f9856 Parents: 8ce3f49 Author: Michael Stack <[email protected]> Authored: Thu Jun 15 13:41:01 2017 -0700 Committer: Michael Stack <[email protected]> Committed: Fri Jun 30 15:31:30 2017 -0700 ---------------------------------------------------------------------- .../client/ScannerCallableWithReplicas.java | 52 +++++++++++--------- 1 file changed, 29 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/262e6bb3/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index 59d0562..10c20d7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -72,6 +72,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> { private int scannerTimeout; private Set<ScannerCallable> outstandingCallables = new HashSet<ScannerCallable>(); private boolean someRPCcancelled = false; //required for testing purposes only + private int regionReplication = 0; public ScannerCallableWithReplicas(TableName tableName, ClusterConnection cConnection, ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan, @@ -143,36 +144,42 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> { //2. We should close the "losing" scanners (scanners other than the ones we hear back // from first) // - RegionLocations rl = null; - try { - rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true, - RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName, - currentScannerCallable.getRow()); - } catch (RetriesExhaustedException | DoNotRetryIOException e) { - // We cannot get the primary replica region location, it is possible that the region server - // hosting meta table is down, it needs to proceed to try cached replicas directly. - if (cConnection instanceof ConnectionManager.HConnectionImplementation) { - rl = ((ConnectionManager.HConnectionImplementation) cConnection) - .getCachedLocation(tableName, currentScannerCallable.getRow()); - if (rl == null) { + // Since RegionReplication is a table attribute, it wont change as long as table is enabled, + // it just needs to be set once. + + if (regionReplication <= 0) { + RegionLocations rl = null; + try { + rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true, + RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName, + currentScannerCallable.getRow()); + } catch (RetriesExhaustedException | DoNotRetryIOException e) { + // We cannot get the primary replica region location, it is possible that the region server + // hosting meta table is down, it needs to proceed to try cached replicas directly. + if (cConnection instanceof ConnectionManager.HConnectionImplementation) { + rl = ((ConnectionManager.HConnectionImplementation) cConnection) + .getCachedLocation(tableName, currentScannerCallable.getRow()); + if (rl == null) { + throw e; + } + } else { + // For completeness throw e; } - } else { - // For completeness - throw e; } + regionReplication = rl.size(); } // allocate a boundedcompletion pool of some multiple of number of replicas. // We want to accomodate some RPCs for redundant replica scans (but are still in progress) ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs = new ResultBoundedCompletionService<Pair<Result[], ScannerCallable>>( RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf), pool, - rl.size() * 5); + regionReplication * 5); AtomicBoolean done = new AtomicBoolean(false); replicaSwitched.set(false); // submit call for the primary replica. - addCallsForCurrentReplica(cs, rl); + addCallsForCurrentReplica(cs); int startIndex = 0; try { @@ -195,7 +202,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> { // If rl's size is 1 or scan's consitency is strong, it needs to throw // out the exception from the primary replica - if ((rl.size() == 1) || (scan.getConsistency() == Consistency.STRONG)) { + if ((regionReplication == 1) || (scan.getConsistency() == Consistency.STRONG)) { // Rethrow the first exception RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries); } @@ -208,13 +215,13 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> { } // submit call for the all of the secondaries at once - int endIndex = rl.size(); + int endIndex = regionReplication; if (scan.getConsistency() == Consistency.STRONG) { // When scan's consistency is strong, do not send to the secondaries endIndex = 1; } else { // TODO: this may be an overkill for large region replication - addCallsForOtherReplicas(cs, rl, 0, rl.size() - 1); + addCallsForOtherReplicas(cs, 0, regionReplication - 1); } try { @@ -308,15 +315,14 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> { } private void addCallsForCurrentReplica( - ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl) { + ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs) { RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable); outstandingCallables.add(currentScannerCallable); cs.submit(retryingOnReplica, scannerTimeout, currentScannerCallable.id); } private void addCallsForOtherReplicas( - ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl, - int min, int max) { + ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, int min, int max) { for (int id = min; id <= max; id++) { if (currentScannerCallable.id == id) {
