Repository: hbase Updated Branches: refs/heads/branch-1.2 be873bbda -> d9127491b
HBASE-14812 Fix ResultBoundedCompletionService deadlock Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d9127491 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d9127491 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d9127491 Branch: refs/heads/branch-1.2 Commit: d9127491bc6c02cdf3a7cdb55b770638bda42134 Parents: be873bb Author: Elliott Clark <ecl...@apache.org> Authored: Fri Nov 13 18:28:12 2015 -0800 Committer: Elliott Clark <ecl...@apache.org> Committed: Tue Nov 17 13:48:58 2015 -0800 ---------------------------------------------------------------------- .../client/ResultBoundedCompletionService.java | 26 +++++++++++++------- .../client/ScannerCallableWithReplicas.java | 22 ++++++++++++----- 2 files changed, 33 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/d9127491/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java index eacbe2d..9b32e93 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java @@ -39,12 +39,13 @@ public class ResultBoundedCompletionService<V> { private final Executor executor; private final QueueingFuture<V>[] tasks; // all the tasks private volatile QueueingFuture<V> completed = null; + private volatile boolean cancelled = false; class QueueingFuture<T> implements RunnableFuture<T> { private final RetryingCallable<T> future; private T result = null; private ExecutionException exeEx = null; - private volatile boolean cancelled; + private volatile boolean cancelled = false; private final int callTimeout; private final RpcRetryingCaller<T> retryingCaller; private boolean resultObtained = false; @@ -61,18 +62,21 @@ public class ResultBoundedCompletionService<V> { public void run() { try { if (!cancelled) { - result = - this.retryingCaller.callWithRetries(future, callTimeout); + result = this.retryingCaller.callWithRetries(future, callTimeout); resultObtained = true; } } catch (Throwable t) { exeEx = new ExecutionException(t); } finally { - if (!cancelled && completed == null) { - completed = (QueueingFuture<V>) QueueingFuture.this; - synchronized (tasks) { - tasks.notify(); + synchronized (tasks) { + // If this wasn't canceled then store the result. + if (!cancelled && completed == null) { + completed = (QueueingFuture<V>) QueueingFuture.this; } + + // Notify just in case there was someone waiting and this was canceled. + // That shouldn't happen but better safe than sorry. + tasks.notify(); } } } @@ -145,19 +149,23 @@ public class ResultBoundedCompletionService<V> { public QueueingFuture<V> take() throws InterruptedException { synchronized (tasks) { - while (completed == null) tasks.wait(); + while (completed == null && !cancelled) tasks.wait(); } return completed; } public QueueingFuture<V> poll(long timeout, TimeUnit unit) throws InterruptedException { synchronized (tasks) { - if (completed == null) unit.timedWait(tasks, timeout); + if (completed == null && !cancelled) unit.timedWait(tasks, timeout); } return completed; } public void cancelAll() { + // Grab the lock on tasks so that cancelled is visible everywhere + synchronized (tasks) { + cancelled = true; + } for (QueueingFuture<V> future : tasks) { if (future != null) future.cancel(true); } http://git-wip-us.apache.org/repos/asf/hbase/blob/d9127491/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index 740eb91..7418292 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -30,6 +30,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; @@ -163,12 +164,13 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> { replicaSwitched.set(false); // submit call for the primary replica. addCallsForCurrentReplica(cs, rl); + try { // wait for the timeout to see whether the primary responds back Future<Pair<Result[], ScannerCallable>> f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds if (f != null) { - Pair<Result[], ScannerCallable> r = f.get(); + Pair<Result[], ScannerCallable> r = f.get(timeout, TimeUnit.MILLISECONDS); if (r != null && r.getSecond() != null) { updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool); } @@ -180,23 +182,31 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> { throw new InterruptedIOException(e.getMessage()); } catch (InterruptedException e) { throw new InterruptedIOException(e.getMessage()); + } catch (TimeoutException e) { + throw new InterruptedIOException(e.getMessage()); } + // submit call for the all of the secondaries at once // TODO: this may be an overkill for large region replication addCallsForOtherReplicas(cs, rl, 0, rl.size() - 1); + try { - Future<Pair<Result[], ScannerCallable>> f = cs.take(); - Pair<Result[], ScannerCallable> r = f.get(); - if (r != null && r.getSecond() != null) { - updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool); + Future<Pair<Result[], ScannerCallable>> f = cs.poll(timeout, TimeUnit.MILLISECONDS); + if (f != null) { + Pair<Result[], ScannerCallable> r = f.get(timeout, TimeUnit.MILLISECONDS); + if (r != null && r.getSecond() != null) { + updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool); + } + return r == null ? null : r.getFirst(); // great we got an answer } - return r == null ? null : r.getFirst(); // great we got an answer } catch (ExecutionException e) { RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries); } catch (CancellationException e) { throw new InterruptedIOException(e.getMessage()); } catch (InterruptedException e) { throw new InterruptedIOException(e.getMessage()); + } catch (TimeoutException e) { + throw new InterruptedIOException(e.getMessage()); } finally { // We get there because we were interrupted or because one or more of the // calls succeeded or failed. In all case, we stop all our tasks.