Repository: hbase Updated Branches: refs/heads/HBASE-19397 2172026ca -> e424657c9 (forced update)
HBASE-19641 AsyncHBaseAdmin should use exponential backoff when polling the procedure result Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/1fa3637b Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/1fa3637b Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/1fa3637b Branch: refs/heads/HBASE-19397 Commit: 1fa3637b4d0020b1c4387610e8aa6b970c0138b8 Parents: a47afc8 Author: zhangduo <zhang...@apache.org> Authored: Wed Jan 3 16:41:21 2018 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Wed Jan 3 18:32:54 2018 +0800 ---------------------------------------------------------------------- .../hadoop/hbase/client/RawAsyncHBaseAdmin.java | 57 ++++++++++---------- 1 file changed, 27 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/1fa3637b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index 7a8d081..ceda280 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -89,6 +89,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; import org.apache.hbase.thirdparty.io.netty.util.Timeout; import org.apache.hbase.thirdparty.io.netty.util.TimerTask; + import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; @@ -2553,40 +2554,36 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { future.completeExceptionally(error); return; } - getProcedureResult(procId, future); + getProcedureResult(procId, future, 0); }); return future; } - private void getProcedureResult(final long procId, CompletableFuture<Void> future) { - this.<GetProcedureResultResponse> newMasterCaller() - .action( - (controller, stub) -> this - .<GetProcedureResultRequest, GetProcedureResultResponse, GetProcedureResultResponse> call( - controller, stub, GetProcedureResultRequest.newBuilder().setProcId(procId).build(), - (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp)) - .call() - .whenComplete( - (response, error) -> { - if (error != null) { - LOG.warn("failed to get the procedure result procId=" + procId, - ConnectionUtils.translateException(error)); - retryTimer.newTimeout(t -> getProcedureResult(procId, future), pauseNs, - TimeUnit.NANOSECONDS); - return; - } - if (response.getState() == GetProcedureResultResponse.State.RUNNING) { - retryTimer.newTimeout(t -> getProcedureResult(procId, future), pauseNs, - TimeUnit.NANOSECONDS); - return; - } - if (response.hasException()) { - IOException ioe = ForeignExceptionUtil.toIOException(response.getException()); - future.completeExceptionally(ioe); - } else { - future.complete(null); - } - }); + private void getProcedureResult(long procId, CompletableFuture<Void> future, int retries) { + this.<GetProcedureResultResponse> newMasterCaller().action((controller, stub) -> this + .<GetProcedureResultRequest, GetProcedureResultResponse, GetProcedureResultResponse> call( + controller, stub, GetProcedureResultRequest.newBuilder().setProcId(procId).build(), + (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp)) + .call().whenComplete((response, error) -> { + if (error != null) { + LOG.warn("failed to get the procedure result procId={}", procId, + ConnectionUtils.translateException(error)); + retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1), + ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); + return; + } + if (response.getState() == GetProcedureResultResponse.State.RUNNING) { + retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1), + ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); + return; + } + if (response.hasException()) { + IOException ioe = ForeignExceptionUtil.toIOException(response.getException()); + future.completeExceptionally(ioe); + } else { + future.complete(null); + } + }); } private <T> CompletableFuture<T> failedFuture(Throwable error) {